Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The WORLD_SIZE environment variable in PyTorch is different from its definition #1790

Closed
isuyyy opened this issue Mar 31, 2023 · 43 comments
Closed

Comments

@isuyyy
Copy link

isuyyy commented Mar 31, 2023

In the PyTorch documentation, it is mentioned that:

the total number of application processes running across all the nodes at one time is called the World Size

However, in the code below, the WORLD_SIZE environment variable is set to the number of replicas.

podTemplateSpec.Spec.Containers[i].Env = append(podTemplateSpec.Spec.Containers[i].Env, corev1.EnvVar{
Name: "WORLD_SIZE",
Value: strconv.Itoa(int(totalReplicas)),
})

Why is this the case?

@isuyyy isuyyy changed the title pytorch WORLD_SIZE env value is different from the definition The WORLD_SIZE environment variable in PyTorch is different from its definition Mar 31, 2023
@johnugeorge
Copy link
Member

This is total number of participating nodes

@isuyyy
Copy link
Author

isuyyy commented Apr 1, 2023

This is total number of participating nodes

I didn't understand. What I meant was that the term 'WORLD_SIZE' in PyTorch and the PyTorch training operator have different meanings.

Thus, when we use the 'torchrun' command, it automatically overwrites the value of 'WORLD_SIZE' with 'nnodes * nprocs_per_node'.
For example, suppose there are 2 nodes, each with 4 processes. The PyTorch training operator would set the value of 'WORLD_SIZE' to 2, but when using the 'torchrun' command, the value is automatically overwritten as 8.

This is quite confusing. Isn't it?

Please let me know if there is anything I am misunderstanding.

@johnugeorge
Copy link
Member

@isuyyy

In your case, can you take a dump where you see 'WORLD_SIZE' to 2 in training pods?

@kuizhiqing
Copy link
Member

kuizhiqing commented Apr 2, 2023

Actually, WORLD_SIZE is not consistent here, the WORLD_SIZE in PyTorch stands for the total number of workers in the framework, but in operator it may refers to --nnodes (PET_NNODES in env maybe).

@johnugeorge
Copy link
Member

@kuizhiqing What do you refer to a node here in operator context?

@kuizhiqing
Copy link
Member

@kuizhiqing What do you refer to a node here in operator context?

@johnugeorge I mean a pod in operator.

Given that we have 2 nodes/machines, and each node has 8 GPUs, we want to run a job with operator that launches 2 pods, with each pod requiring 8 GPUs.
In this situation, the environment variable PET_NNODES should be set to 2, indicating we have 2 nodes.
The environment variable WORLD_SIZE should be set to 16, indicating the total number of GPUs across nodes that will be used (2 nodes x 8 GPUs per node = 16 GPUs total).
Each of the 2 pods will have 8 processes, and each process will be bound to 1 GPU.

@tenzen-y
Copy link
Member

Each of the 2 pods will have 8 processes, and each process will be bound to 1 GPU.

@kuizhiqing Does that mean we launcher multiple ranks in a pod in this case?

@tenzen-y
Copy link
Member

tenzen-y commented May 10, 2023

If so, we may be able to add RanksPerWorker to PyTorchJob as well as SlotsPerWorker of MPIJob.

// Specifies the number of slots per worker used in hostfile.
// Defaults to 1.
// +optional
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

@kuizhiqing
Copy link
Member

kuizhiqing commented May 11, 2023

Does that mean we launcher multiple ranks in a pod in this case?

@tenzen-y

Yes, since @isuyyy is using torchrun which aliases python -m torch.distributed.run as the entrypoint.
In the case mentioned above, launching 2 pods with 8 GPUs each would be more efficient practically than launching 16 pods with 1 GPU each, even though the latter approach is more consistent with the one process per container philosophy.

@tenzen-y
Copy link
Member

In the case mentioned above, launching 2 pods with 8 GPUs each would be more efficient practically than launching 16 pods with 1 GPU each, even though the latter approach is more consistent with the one process per container philosophy.

@kuizhiqing Agree. It might be worth mentioning that the "training operator doesn't support launch multiple ranks in one pod" in docs.

@kuizhiqing
Copy link
Member

@tenzen-y IMO, it would be better to support and encourage users to use torchrun as the entrypoint for PyTorch distributed training. This can be implemented by taking resource declaration into consideration.

@johnugeorge
Copy link
Member

johnugeorge commented May 11, 2023

@tenzen-y let me try to understand your statement "training operator doesn't support launch multiple ranks in one pod"

In a cluster of 2 nodes with 8 gpus each, I see couple of options

Option 1:
In baremetal case, run torchrun --nnodes=2 --nproc-per-node=1 python train.py in each node where train.py uses 8 gpus within code(all cuda devices within the node)

With operator, this is equivalent to
worker spec with 2 replica scheduled on 2 different nodes with 8 GPU limits for each replica.

Worldsize is 2 in both cases

Option2:

In baremetal case, run torchrun --nnodes=2 --nproc-per-node=8 python train.py in each node where train.py uses 1 gpu

With operator, we can simulate a similar behaviour
worker spec with 16 replica with 1 GPU limit for each replica.

Worldsize is 16 in both cases

Are we talking about the same use case or something different? I am not sure if we want to bring nproc-per-node concept to the operator

@tenzen-y
Copy link
Member

@johnugeorge I wanted to say about the following case:

apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
  name: pytorch-simple
  namespace: kubeflow
spec:
  pytorchReplicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727
              imagePullPolicy: Always
              command:
                - "torchrun --nnodes=2  --nproc-per-node=8 python train.py"
              resources:
                limits:
                  example.com/gpu: 8

IIUC, the training operator doesn't support the nproc-per-node other than 1.

@johnugeorge
Copy link
Member

@tenzen-y Is there a specific reason that you want to run the command explicitly torchrun --nnodes=2 --nproc-per-node=8 python train.py if it can be acheived in a different manner as in #1790 (comment)?

@tenzen-y
Copy link
Member

Is there a specific reason that you want to run the command explicitly torchrun --nnodes=2 --nproc-per-node=8 python train.py

@johnugeorge No, I don't have use cases. I just wanted to say mentioning the "training operator doesn't support launch multiple ranks in one pod" in docs might be better.

This means we should suggest users set appropriate the number of replicas to .spec.pytorchReplicas.worker.replicas, instead of using torchrun --nnodes=2 --nproc-per-node=8 python train.py.

@tenzen-y
Copy link
Member

However, @kuizhiqing has another opinion.

@tenzen-y IMO, it would be better to support and encourage users to use torchrun as the entrypoint for PyTorch distributed training. This can be implemented by taking resource declaration into consideration.

#1790 (comment)

@kuizhiqing
Copy link
Member

kuizhiqing commented May 11, 2023

@tenzen-y @johnugeorge

In fact, I have several rationales to substantiate my perspective:

  1. torchrun is the generic approach to run PyTorch distributed training tasks, not an external dependency, it was designed to manage multi-process on node.
  2. There is a performance loss if we do not allocate 8 GPUs for 1 pod, more specifically, about 3% for 16 GPU cases based on my experiment.
  3. To train large language models, more than 1k nodes for example, too many pods may increase the overhead of the Kubernetes cluster, which also makes it slower to start.
  4. Also in the large language model scenario, hybrid parallel strategies are applied, including data parallelism, tensor parallelism and pipeline parallelism, which are rank sensitive, for examples, we prefer tensor parallelism to be utilized on internal nodes which is performance efficient.
  5. Also for hybrid parallel strategies, in some cases, 8 GPUs in the same node may share some data locally at runtime.
  6. The rank should indicate the physical proximity, applications often prefer GLOBAL_RANK to indicate the id of the GPU in total, and LOCAL_RANK to indicate the id of the GPU in the node, and many strategies are built on it, so it was important to be ordered.

In summary, based on my points, using 8 GPUs per pod, and specifying the ranks explicitly for hybrid parallelism are important for performance in PyTorch distributed training for large language models.

Let me know if I should clarify any part of the arguments.

@andreyvelich
Copy link
Member

andreyvelich commented May 12, 2023

@kuizhiqing Thanks for providing this information. I agree, usually running multiple GPUs per 1 pod is more efficient, so Training Operator should properly set env variables for different use-cases.

All of our existing PyTorch examples don't use torchrun, as we always use torch.distributed to set distributed strategy.

@kuizhiqing @johnugeorge @tenzen-y I think, we should discuss this on the upcoming AutoML + Training WG Community Meeting on May 17th.
@kuizhiqing @isuyyy It would be nice if you could find time to attend it and discuss.

@tenzen-y
Copy link
Member

@kuizhiqing As you say, attaching multiple GPUs to a Pod is worth it.

But I'm not sure why you prefer setting --nproc-per-node=8 to a torchrun.
This means I think we can attach multiple GPUs to a Pod in the following for now:

apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
  name: pytorch-simple
  namespace: kubeflow
spec:
  pytorchReplicaSpecs:
    Worker:
      replicas: 2
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727
              imagePullPolicy: Always
              command:
+                - "torchrun --nnodes=2  --nproc-per-node=1 python train.py"
-                - "torchrun --nnodes=2  --nproc-per-node=8 python train.py"
              resources:
                limits:
                  example.com/gpu: 8

Is there any reason you prefer --nproc-per-node=8 instead of --nproc-per-node=1?

@kuizhiqing
Copy link
Member

@andreyvelich
Recent releases of Pytorch have replaced the torch.distributed.launch utility with torch.distributed.run. This new approach does not require manually setting the WORLD_SIZE environment variable. I think we should aim to support torch.distributed.run in a more generic fashion.

However, the multiple GPUs 1 pod problem exists in the launch version.

I would like to attend the meeting, but I cannot guarantee my availability since 5:00 PM UTC is not Asian timezone friendly.

@kuizhiqing
Copy link
Member

@tenzen-y I think we should use --nproc-per-node=8 in this case since I assume we are using 1 process per GPU. This is consistent with the torchrun design. Typically, one GPU will be bound to 1 process or thread. It is possible to do it all in one process, but that may not be efficient. So I think 1 GPU per process is the default setup in production. Please let me know if you have any other use cases.

More specific examples of using torchrun can be found in https://github.com/NVIDIA/Megatron-LM/blob/main/examples/pretrain_gpt_distributed.sh#L20-L25. The nproc_per_node argument may refer to GPUS_PER_NODE.

For the operator, we could implement it using environment variables, plz refer to #1573

@tenzen-y
Copy link
Member

tenzen-y commented May 13, 2023

Recent releases of Pytorch have replaced the torch.distributed.launch utility with torch.distributed.run. This new approach does not require manually setting the WORLD_SIZE environment variable.

Sounds good to me. In fact, we use torch.distributed.run on Elastic PyTorch Training.

Note:

This module is going to be deprecated in favor of torchrun.

https://pytorch.org/docs/stable/distributed.html#launch-utility

@tenzen-y
Copy link
Member

@tenzen-y I think we should use --nproc-per-node=8 in this case since I assume we are using 1 process per GPU. This is consistent with the torchrun design. Typically, one GPU will be bound to 1 process or thread. It is possible to do it all in one process, but that may not be efficient. So I think 1 GPU per process is the default setup in production. Please let me know if you have any other use cases.

More specific examples of using torchrun can be found in https://github.com/NVIDIA/Megatron-LM/blob/main/examples/pretrain_gpt_distributed.sh#L20-L25. The nproc_per_node argument may refer to GPUS_PER_NODE.

For the operator, we could implement it using environment variables, plz refer to #1573

@kuizhiqing Thank you for clarifying :) That information makes much sense and is great to see!

@tenzen-y
Copy link
Member

I agree with @kuizhiqing suggestions.

@tenzen-y IMO, it would be better to support and encourage users to use torchrun as the entrypoint for PyTorch distributed training. This can be implemented by taking resource declaration into consideration.

Any concerns? @johnugeorge

@tenzen-y
Copy link
Member

@kuizhiqing @johnugeorge @tenzen-y I think, we should discuss this on the upcoming AutoML + Training WG Community Meeting on May 17th.

@andreyvelich Sorry for the late response and thank you for suggesting that. I was missing your message.
Maybe for this discussion, asynchronous communication in GitHub is suitable since the time zone we are in is very different.

@johnugeorge
Copy link
Member

johnugeorge commented May 15, 2023

@kuizhiqing Sorry for late reply. I agree with your points on performance and related strategies on baremetal runs. But in the container world, what is the design change that you propose for "8 GPUs per pod and specifying the ranks" ? How do you define "LOCAL_RANK" in this case?

@kuizhiqing
Copy link
Member

@tenzen-y @johnugeorge Sorry for the silence recently, I was quite occupied. I'd like to make a proposal about PyTorch operator adaptation which may take PyTorch and its application like Megatron, DeepSpeed into consideration when I'm available.

@tenzen-y
Copy link
Member

tenzen-y commented Jun 6, 2023

Thanks! @kuizhiqing
I'm looking forward to the proposal!

@johnugeorge
Copy link
Member

Thanks @kuizhiqing

Looking forward to it

@nairbv
Copy link

nairbv commented Aug 2, 2023

The WORLD_SIZE environment variable in PyTorch is different from its definition

This also implies RANK is set to a value different from its definition right?

I believe RANK should be in the range of 0 to (num_nodes * num procs per node). From the torchrun docs RANK is defined as "The global rank." In reality I think we're getting RANK values referring to the node idx not global worker rank.

@brannondorsey
Copy link

Was this solved by #1840?

@nairbv
Copy link

nairbv commented Sep 27, 2023

looks like it fixes WORLD_SIZE but doesn't make a corresponding change to RANK

https://github.com/kubeflow/training-operator/pull/1840/files#diff-758e05740d851d22594c040993aae5e634649b5ddb2aae0f634d7842543dd3cfR65

@brannondorsey
Copy link

Any known workarounds at the moment?

I suppose a user-defined worker process could overwrite the RANK environment variable internally if it knows own node number (PET_NODE_RANK perhaps) and LOCAL_RANK.

@kuizhiqing
Copy link
Member

@brannondorsey @nairbv What's your launch mode , as I mentioned in the PR and discussion, we support 3 mode to launch, RANK may be overwrite actually.

@brannondorsey
Copy link

brannondorsey commented Sep 28, 2023

@kuizhiqing I'm looking to perform multi-node distributed training with a torchrun entrypoint. Workers will be scheduled to 8x GPU nodes, and I'd like to use 8x processes per container (where 8 GPUs are made available to each container via a resource request).

I'm looking to basically do what's described in #1872, or your proposal in #1836.

@nairbv
Copy link

nairbv commented Sep 28, 2023

@kuizhiqing same here, running training jobs with torchrun. Current workaround is to just use whatever values are available based on whatever they mean when passed to us, even if they don't match the documentation.

I don't know much about kubeflow/training operators, but for examples I just see we have a helm chart with a bunch of commands doing things like torchrun --nnodes=${WORLD_SIZE} --node_rank=${RANK}. It works, but I found it confusing because semantically it's not what the docs say those env variables mean (and I think these are variables that torchrun will end up setting/overriding in the actual job)

Definitions and env variables expected by torchrun:
https://pytorch.org/docs/stable/elastic/run.html#environment-variables

Also the fix will be BC-breaking for the workaround, since we'll still need to pass values that means "number of nodes" (etc) for each of these commands.

@kuizhiqing
Copy link
Member

Hi @nairbv @brannondorsey

First of all, we do not need to pass env as args to command any more, pytorch will handle all that env with prefix PET_, ref.

Well, nnodes means the number of nodes, pods in kubernetes context, so nnodes equals to the totals replicas defined in spec, see ref

For RANK, it will be overwrite as it should be. Let's try to understand it as follows, we have 8 GPUs in one pod, so we have 8 process in it, they share the same env but with different RANK, while torchrun is the MAIN process in pod to manage the 8 process, so it will and should handle the env for its subprocess. For a short summary, env is assigned for pod, for the main process, not for the worker process bind to GPU.

Hope it helps, feel free to continue discussion if it remains unclear.

@nairbv
Copy link

nairbv commented Sep 28, 2023

Well, nnodes means the number of nodes, pods in kubernetes context, so nnodes equals to the totals replicas defined in spec, see ref

Right, that's clear, and that's what nnodes means to torchrun.

After the referenced PR, WORLD_SIZE here will also mean the same thing that it means in torch run (num procs across all nodes). If RANK (in the pod env) doesn't change, it means that RANK will (still) be in the range of 1...num_pods (no longer in the range 1...WORLD_SIZE like it used to be, and like it is in pytorch). When the name is re-used and overwritten by torchrun, it will then become 1...WORLD_SIZE. Doesn't that name-reuse seem confusing?

The title of this issue seems to suggest that the env variables should match what's expected by pytorch, but maybe that's not the goal. Personally I might prefer if RANK wasn't set, and instead the rank of the node/pod was called something like NODE_RANK or PET_NODE_RANK or POD_RANK.

@brannondorsey
Copy link

brannondorsey commented Sep 28, 2023

PyTorch Lightning's KubeflowEnvironment.global_rank() method uses RANK to provide a value in 1...WORLD_SIZE space. This is the value I would expect, however my understanding is that this is not the way the training operator is setting RANK now if a master replica is defined.

Unfortunately, KubeflowEnvironment.local_rank() method also returns the constant 0, which wouldn't be well behaved in the scenario we are describing.

Right now, I'm basically here...

  1. I think there is a way to do what I'm looking to do with PyTorchJob, but it may involve some environment variable overwriting/ignoring.
  2. It isn't clear to me what the appropriate behavior/path forward would be. I like @nairbv's suggestion to prefer RANK not be set if it is wrong, or separate GLOBAL_RANK and LOCAL_RANK be managed. But this would need to be coordinated with external packages like PyTorch and PyTorchLightning.

@kuizhiqing
Copy link
Member

You are right, maybe not.

The goals of this operator is allow user to run pytorch job in different ways, there are many methods indeed. Well we take the compatibility as priority, we just set almost all env for all possible way and ensure it works, if you find it do not work for a conventional setting, please let me known.

The architecture of distributed processes management is quite different for different launch methods, the situation is even worse if we take elastic mode into consideration. It would be easier for the operator to fix the entry mode, but it's not the case for now.

Technically, the operator process is before user run process, we do not know the launch method. It's too tricky to elevate user command, and we cannot do that if it was wrapped by shell script.

@nairbv
Copy link

nairbv commented Sep 29, 2023

As a user, I think one of the things I found confusing about RANK was I assumed that since it was set then the operation must be happening at the level of individual processes (and was wondering how that could interact with torchrun, which would usually start the processes). In pytorch RANK (and LOCAL_RANK) are set at the process level, so in order to set a valid value to exist it would have to already have kicked off already kicked off the (e.g. 8) processes per node. It just made it hard for me to get my head around what the architecture was or what was going on, or how to properly set the parameters to torchrun.

Without actually creating the per-node processes, I don't think it's possible to set correct RANK or LOCAL_RANK vars that would conform to the torchrun definition (which doesn't matter because torchrun can set them itself), but something like NODE_RANK would be clear and meaningful in this context.

I agree it's challenging and requires coordination though since the change isn't backwards compatible. e.g. if RANK is removed and replaced with NODE_RANK then users need to change any code referencing RANK. Then again the definition of WORLD_SIZE is already changing from nnodes to the global number of procs (to conform to torchrun), so might be a good time to change both.

Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Copy link

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants