Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to all_gather Tensor if not the same length #1569

Open
Nic-Ma opened this issue Jan 22, 2021 · 16 comments
Open

How to all_gather Tensor if not the same length #1569

Nic-Ma opened this issue Jan 22, 2021 · 16 comments
Labels

Comments

@Nic-Ma
Copy link
Contributor

Nic-Ma commented Jan 22, 2021

❓ Questions/Help/Support

Hi @vfdev-5 ,

I am developing distributed evaluation feature, and facing a problem that the preds and labels on different GPUs don't have the same length, then ignite.idist.all_gather() can't work. For example: GPU0 has 5 images to handle, GPU1 has 4 images, total=9 images.
Could you please help on how to idist.all_gather() the values?
I don't want to pad data for the input to make it evenly-divisible, because it will cause the metrics different on single GPU and multi-GPUs.

Thanks in advance.

@Nic-Ma
Copy link
Contributor Author

Nic-Ma commented Jan 22, 2021

BTW, I mean when using ignite.metrics.EpochMetric, the inputs preds and labels have different lengths on different GPUs.

Thanks.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Jan 22, 2021

Hi @Nic-Ma ,

I think this can be done with the broadcast collective op:

  • broadcast for each process its data to everyone
  • concat results

Probably, this can be slow and thus could not be used everytime... I'll prototype a code snippet a bit later.

In this issue: #1288, we though about providing a feature to customize reduction/gather ops for metrics, such that user could use its own adapted solution as well.

@Nic-Ma
Copy link
Contributor Author

Nic-Ma commented Jan 23, 2021

Hi @vfdev-5 ,

Or maybe we can pad NaN Tensor to make it evenly-divisible, and delete NaN number after all_gather()?

Thanks.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Jan 23, 2021

Or maybe we can pad NaN Tensor to make it evenly-divisible, and delete NaN number after all_gather()?

Well, yes, padding can be an option too. But in the first message you said you did not want to pad...

@Nic-Ma
Copy link
Contributor Author

Nic-Ma commented Jan 25, 2021

Hi @vfdev-5 ,

Sorry I didn't make it clear, I mean we don't want to pad data for the DistributedSampler before model prediction.
So maybe it's possible to only pad NaN before all_gather() and delete NaN after.

Thanks.

@Nic-Ma
Copy link
Contributor Author

Nic-Ma commented Jan 25, 2021

Hi @vfdev-5 ,

I am facing an issue with idist.all_gather() for string:
Here I put all the filenames of 1 GPU to self.filenames list and join them into 1 string, then try to gather, but it always hangs at the all_gather line.

_filenames = "\t".join(self._filenames)
_filenames = idist.all_gather(_filenames)

My PyTorch version is 1.7.0 and GPU is V100, do you know any reason of this issue?

Thanks.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Jan 25, 2021

Hi @Nic-Ma

here is a draft version for `all_gather` with padding
# Question : https://github.com/pytorch/ignite/issues/1569

import os
import time
from typing import Optional, List

import torch
import torch.nn.functional as F
import torch.distributed as dist


def compute_padding(shape, new_shape):
    padding = []
    for dim, new_dim in zip(shape, new_shape):
        padding.insert(0, new_dim - dim)
        padding.insert(0, 0)
    return padding


def all_gather(tensor: torch.Tensor, fixed_shape: Optional[List] = None) -> List[torch.Tensor]:
    input_shape = tensor.shape
    if fixed_shape is not None:
        padding = compute_padding(tensor.shape, fixed_shape)
        if sum(padding) > 0:
            tensor = F.pad(tensor, pad=padding, mode='constant', value=0)
    
    output = [torch.zeros_like(tensor) for _ in range(dist.get_world_size())]
    dist.all_gather(output, tensor)

    all_input_shapes = None
    if fixed_shape is not None:
        # gather all shapes
        tensor_shape = torch.tensor(input_shape, device=tensor.device)
        all_input_shapes = [torch.zeros_like(tensor_shape) for _ in range(dist.get_world_size())]
        dist.all_gather(all_input_shapes, tensor_shape)
        all_input_shapes = [t.tolist() for t in all_input_shapes]

    if all_input_shapes:
        for i, shape in enumerate(all_input_shapes):
            padding = compute_padding(output[i].shape, shape)
            if sum(padding) < 0:
                output[i] = F.pad(output[i], pad=padding)

    return output


if __name__ == "__main__":
    
    dist.init_process_group("nccl", init_method="env://")
    
    lrank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(lrank)
    rank = dist.get_rank()

    size = 5
    uneven_size = 2
    t = torch.arange(dist.get_world_size() * size - uneven_size, device="cuda")

    input_tensor = t[size * rank: size * (rank + 1)]
    time.sleep(0.5 * (dist.get_rank() + 1))
    print(rank, " - Input : ", input_tensor)

    result = all_gather(input_tensor, fixed_shape=(size, ))

    time.sleep(0.5 * (dist.get_rank() + 1))
    print(rank, " - Output: ", result)

    dist.destroy_process_group()

Run as

python -u -m torch.distributed.launch --nproc=2 --use_env question-uneven-input-all-gather.py 

0  - Input :  tensor([0, 1, 2, 3, 4], device='cuda:0')
1  - Input :  tensor([5, 6, 7], device='cuda:1')
0  - Output:  [tensor([0, 1, 2, 3, 4], device='cuda:0'), tensor([5, 6, 7], device='cuda:0')]
1  - Output:  [tensor([0, 1, 2, 3, 4], device='cuda:1'), tensor([5, 6, 7], device='cuda:1')]

As for idist.all_gather hang with string input. Let me try to reproduce it and will see.

BTW, what is the best way to reach out to you to communicate with messages to discuss about few things ?
I sent you few messages on MONAIBoot2020 slack...

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Jan 25, 2021

@Nic-Ma I couldn't reproduce your issue.

Here is my code:
import os
import time

import torch
import torch.distributed as dist
import ignite
import ignite.distributed as idist


if __name__ == "__main__":

    print(torch.__version__, ignite.__version__)

    dist.init_process_group("nccl", init_method="env://")
    
    lrank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(lrank)
    rank = dist.get_rank()

    all_filenames = [f"filename_{i}_{rank}" for i in range(10)]
    data = "\t".join(all_filenames)
    time.sleep(0.5 * (dist.get_rank() + 1))
    print(rank, " - Input : ", data)

    result = idist.all_gather(data)

    time.sleep(0.5 * (dist.get_rank() + 1))
    print(rank, " - Output: ", result)

    dist.destroy_process_group()

and output

# python -u -m torch.distributed.launch --nproc=2 --use_env issue-1569-repro-hang-idist-gather-string.py 
1.7.1 0.5.0.dev20210121
1.7.1 0.5.0.dev20210121
0  - Input :  filename_0_0      filename_1_0    filename_2_0    filename_3_0    filename_4_0    filename_5_0    filename_6_0    filename_7_0    filename_8_0    filename_9_0
1  - Input :  filename_0_1      filename_1_1    filename_2_1    filename_3_1    filename_4_1    filename_5_1    filename_6_1    filename_7_1    filename_8_1    filename_9_1
0  - Output:  ['filename_0_0\tfilename_1_0\tfilename_2_0\tfilename_3_0\tfilename_4_0\tfilename_5_0\tfilename_6_0\tfilename_7_0\tfilename_8_0\tfilename_9_0', 'filename_0_1\tfilename_1_1\tfilename_2_1\tfilename_3_1\tfilename_4_1\tfilename_5_1\tfilename_6_1\tfilename_7_1\tfilename_8_1\tfilename_9_1']
1  - Output:  ['filename_0_0\tfilename_1_0\tfilename_2_0\tfilename_3_0\tfilename_4_0\tfilename_5_0\tfilename_6_0\tfilename_7_0\tfilename_8_0\tfilename_9_0', 'filename_0_1\tfilename_1_1\tfilename_2_1\tfilename_3_1\tfilename_4_1\tfilename_5_1\tfilename_6_1\tfilename_7_1\tfilename_8_1\tfilename_9_1']

Please, let me know if my example correctly implements your issue. Thanks

@Nic-Ma
Copy link
Contributor Author

Nic-Ma commented Jan 25, 2021

Hi @vfdev-5 ,

Thanks very much for your detailed program!!
I also can't reproduce the issue with your program, let me try to make a program to reproduce it.
And BTW, I don't know why I can't login to the BootCamp workspace in slack anymore...So maybe let's communicate in email? my email nma@nvidia.com is always online.

Thanks.

@Nic-Ma
Copy link
Contributor Author

Nic-Ma commented Jan 25, 2021

I compared your program with mine and found the root cause, sorry for my mistake, the all_gather works for my string now.
Thanks very much for your help and example program!!!

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Jan 25, 2021

@Nic-Ma how about this code: #1569 (comment) for all_gather tensors of different shapes ?

We may think to add more functions like that (e.g. all_reduce) and put them into contrib.distributed.utils module...

@Nic-Ma
Copy link
Contributor Author

Nic-Ma commented Jan 26, 2021

Hi @vfdev-5 ,

Your example code looks good, and I developed a evenly_divisible_all_gather() in MONAI now to handle this case:

def evenly_divisible_all_gather(data: torch.Tensor):
    """
    Utility function for distributed data parallel to pad tensor to make it evenly divisible for all_gather.
    Args:
        data: source tensor to pad and execute all_gather in distributed data parallel.

    """
    if idist.get_world_size() <= 1:
        return data
    # make sure the data is evenly-divisible on multi-GPUs
    length = data.shape[0]
    all_lens = idist.all_gather(length)
    max_len = max(all_lens).item()
    if length < max_len:
        size = [max_len - length] + list(data.shape[1:])
        data = torch.cat([data, data.new_full(size, float("NaN"))], dim=0)
    # all gather across all processes
    data = idist.all_gather(data)
    # delete the padding NaN items
    return torch.cat([data[i * max_len : i * max_len + l, ...] for i, l in enumerate(all_lens)], dim=0)

Thanks.

@insung3511
Copy link

Can I apply the above code directly to the torch.distributed module all_gather? I found a tensor shape issue in the torch.distrubuted all_gather method.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Jan 5, 2024

@insung3511 yes, you can rewrite above code using torch.distributed but it would require more code to write as idist.all_gather is specifically made to simplify the code.

@francescotaioli
Copy link

I simulated a scenario in which I want to gather the predictions (may be logits) and labels, in order to calculate some metrics like AP, ecc.


import os
import time
import torch
import torch.distributed as dist

if __name__ == "__main__":
    '''
    Simulate gathering predictions labels (Xi) and Predictions (Xi + 1) from all the GPUs using python pickable objects.
    In this way, we don't actually need to pad the tensors to the same size, which is a requirement for the all_gather function.

    Moreover, we are sure that the tensors are gathered in the same order as the ranks (because we the key to output list is the rank itself)
    '''
    
    dist.init_process_group("nccl", init_method="env://")
    
    lrank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(lrank)
    rank = dist.get_rank()


    if rank == 0:
        labels = torch.arange(1,3, device="cuda")
        predictions = torch.arange(2,4, device="cuda")
    else:
        labels = torch.arange(10,end=22, device="cuda")
        predictions = torch.arange(11,end=23, device="cuda")
    
    all_labels = {rank: labels.tolist()}
    all_predictions = {rank: predictions.tolist()}

    print(f"Rank: {rank} - Labels: {labels}")
    print(f"Rank: {rank} - Predictions: {predictions}")


    output_list_labels = [torch.zeros_like(torch.empty(1)) for _ in range(dist.get_world_size())]
    output_list_predictions = [torch.zeros_like(torch.empty(1)) for _ in range(dist.get_world_size())]
    

    if rank == 1:
        time.sleep(2) # simulate delay

    dist.all_gather_object(output_list_labels, all_labels)
    dist.all_gather_object(output_list_predictions, all_predictions)

    
    if rank == 0:
        print()
        print(f"Gather all the tensors at rank {rank}!")
        print("Labels: ", output_list_labels)
        print("Predictions: ", output_list_predictions)

        # take preds and labels from the same rank
        # ...
    
    dist.destroy_process_group()

Run with: torchrun --rdzv_backend c10d --rdzv_endpoint localhost:0 --nnodes 1 --nproc_per_node 2 FILE.py

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Jan 25, 2024

Same code as above with ignite -> shorter and simplier:

import time
import ignite.distributed as idist


def func(local_rank):    
    '''
    Simulate gathering predictions labels (Xi) and Predictions (Xi + 1) from all the GPUs using python pickable objects.
    In this way, we don't actually need to pad the tensors to the same size, which is a requirement for the all_gather function.

    Moreover, we are sure that the tensors are gathered in the same order as the ranks (because we the key to output list is the rank itself)
    '''
    
    rank = idist.get_rank()
    device = idist.device()

    if rank == 0:
        labels = torch.arange(1,3, device=device)
        predictions = torch.arange(2,4, device=device)
    else:
        labels = torch.arange(10,end=22, device=device)
        predictions = torch.arange(11,end=23, device=device)
    
    all_labels = {rank: labels.tolist()}
    all_predictions = {rank: predictions.tolist()}

    print(f"Rank: {rank} - Labels: {labels}")
    print(f"Rank: {rank} - Predictions: {predictions}")

    if rank == 1:
        time.sleep(2) # simulate delay

    output_list_labels = idist.all_gather(all_labels)
    output_list_predictions = idist.all_gather(all_predictions)
    
    if rank == 0:
        print()
        print(f"Gather all the tensors at rank {rank}!")
        print("Labels: ", output_list_labels)
        print("Predictions: ", output_list_predictions)

        # take preds and labels from the same rank
        # ...    
    
idist.spawn("gloo", func, (), nproc_per_node=2)    

Output:

Rank: 0 - Labels: tensor([1, 2])
Rank: 1 - Labels: tensor([10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21])
Rank: 0 - Predictions: tensor([2, 3])
Rank: 1 - Predictions: tensor([11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22])

Gather all the tensors at rank 0!
Labels:  [{0: [1, 2]}, {1: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]}]
Predictions:  [{0: [2, 3]}, {1: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]}]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants