Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDP (multi GPU) Iterable Dataset is not working as expected ? #15734

Open
s4sarath opened this issue Nov 19, 2022 · 43 comments
Open

DDP (multi GPU) Iterable Dataset is not working as expected ? #15734

s4sarath opened this issue Nov 19, 2022 · 43 comments
Labels
question Further information is requested

Comments

@s4sarath
Copy link

Bug description

Hi,

I am currently testing with IterableDataset and DDP.

Total Examples - 10000
Batch_size - 32
NUM_GPUS - 2 .

While using IterableDataset , ideally with 2 GPUS, we are supposed to run 157 steps (10000 / 32 batch / 2 gpus) in one epoch. But, instead of that, it is running for 314 steps (10000 / 32 batch) .

This issue is only with IterableDataset. When I am using normal Dataset (map dataset) from torch things are good and fine. Is there any reason for this particular behaviour ?

How to reproduce the bug

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import lightning as L
import torch
import time
from datasets import list_datasets, load_dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import DataLoader, Dataset


BATCH_SIZE = 32
NUM_WORKERS = 1

# Load Dataset in Memory

imdb_data = load_dataset("imdb")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_text(batch):
    return tokenizer(batch["text"], truncation=True, padding=True)

imdb_dataset = imdb_data
imdb_tokenized = imdb_dataset.map(tokenize_text, batched=True, batch_size=None)
imdb_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "label"])

def custom_iterator():
    counter = 0
    for item in imdb_tokenized['train']:
        
        inputs = {'input_ids': item['input_ids'], 'attention_mask': item['attention_mask']}
        labels = {'labels': item['label']}
        counter += 1
        yield inputs, labels


class MyIterableDataset(torch.utils.data.IterableDataset):
    def __init__(self):
        super().__init__()

    def __iter__(self):
        yield from custom_iterator()

train_dataset = MyIterableDataset()

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    persistent_workers=False
)

# Load Model
model = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-uncased", num_labels=2)

# Ligntning Module
class LightningModel(L.LightningModule):
    def __init__(self, model, learning_rate=5e-5):
        super().__init__()

        self.learning_rate = learning_rate
        self.model = model

    def forward(self, input_ids, attention_mask, labels):
        return self.model(input_ids, attention_mask=attention_mask, labels=labels)
        
    def training_step(self, batch, batch_idx):
        
        inputs, labels = batch
        outputs = self(inputs["input_ids"], attention_mask=inputs["attention_mask"],
                    labels=labels["labels"])        
        self.log("train_loss", outputs["loss"])
        
        # print(" Tensor sum ", torch.sum(inputs['input_ids']))
        # print("-------------------")
        # print(3*"\n")
        
        self.log("tensor_sum", torch.sum(inputs['input_ids']))
        
        return outputs["loss"]  # this is passed to the optimizer for training

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer
    

lightning_model = LightningModel(model)

from pytorch_lightning.loggers import CSVLogger, WandbLogger

name = "train_ddp_map-iterable"
logger = CSVLogger(save_dir="logs/", name=name)
wandb_logger = WandbLogger(project="DDP_exps", name=name)

def train_model():
    
    max_epochs = 2
    
    if os.path.exists('checkpoints'):
        import shutil
        shutil.rmtree('checkpoints')
        
    trainer = L.Trainer(
        max_epochs=max_epochs,
        callbacks=None,
        accelerator="gpu",
        devices=[0, 1],
        logger=[logger, wandb_logger],
        strategy='ddp',
        enable_progress_bar=True, # Disable progress bar
        log_every_n_steps=1,
    )

    trainer.fit(model=lightning_model,
                train_dataloaders=train_loader)
    
if __name__=='__main__':
    
    start_time = time.time()
    train_model()
    end_time = time.time()
    print()
    print("Time taken to train model is {} seconds".format(end_time-start_time))

Error messages and logs


# Error messages and logs here please

Environment


#- Lightning Component (e.g. Trainer, LightningModule, LightningApp, LightningWork, LightningFlow):
#- PyTorch Lightning Version (e.g., 1.5.0):
#- Lightning App Version (e.g., 0.5.2):
#- PyTorch Version (e.g., 1.10):
#- Python version (e.g., 3.9):
#- OS (e.g., Linux):
#- CUDA/cuDNN version:
#- GPU models and configuration:
#- How you installed Lightning(`conda`, `pip`, source):
#- Running environment of LightningApp (e.g. local, cloud):

More info

No response

@s4sarath s4sarath added the needs triage Waiting to be triaged by maintainers label Nov 19, 2022
@awaelchli
Copy link
Contributor

awaelchli commented Nov 20, 2022

Yes this is expected. Lightning can't know how to shard the data/iterator you provide. You need to make sure your iterator returns half of the data on GPU 0 and the other half on GPU 1. You can do this for example by changing your for loop to something like this (typos expected):

for item in imdb_tokenized['train'][rank::num_gpus]:
    ...

This shards your data. The rank can be accessed for example through trainer.global_rank.
If you do this, you need to make sure the iterator returns the same amount of data on each rank (e.g., drop the remainder)

Another way would be to use the DistribuedSampler inside your iterable dataset.

@awaelchli awaelchli added question Further information is requested and removed needs triage Waiting to be triaged by maintainers labels Nov 20, 2022
@s4sarath
Copy link
Author

Makes sense.

One more doubt. In my DataLoader if my num_workers=2, in each GPU, the whole training loop runs 2 times.
Say for eg:

Assume, my IterableDataset has 10000 records and my batch size = 32 . So total steps = 10000//32 = 314 .

If my num_workers=2, it is looping 628 times in each GPU . Is this expected ?
Because, num_workers=2 is supposed to make DataLoader pipeline faster right.

Is there any concept of steps_per_epoch in lightning. Say, epochs=10, steps_per_epoch=1000, I want each epoch to run 1000 loops max.

In keras steps per epoch concept is there. Here, is there any way to
mention, every epoch say run 1000 steps.

@awaelchli
Copy link
Contributor

If my num_workers=2, it is looping 628 times in each GPU . Is this expected ?

No. num_workers has nothing to do with the sampling of the data.

Because, num_workers=2 is supposed to make DataLoader pipeline faster right.

Read more about workers here: https://pytorch.org/docs/stable/data.html#multi-process-data-loading

Is there any concept of steps_per_epoch in lightning. Say, epochs=10, steps_per_epoch=1000, I want each epoch to run 1000 loops max.

Trainer(limit_train_batches=1000, max_epochs=10)

@s4sarath
Copy link
Author

If my num_workers=2, it is looping 628 times in each GPU . Is this expected ?

No. num_workers has nothing to do with the sampling of the data.

Thanks. But, if you could execute the above code by NUM_WORKERS=2, you can notice that, in each GPUs ( 2 GPUS I have ) , it rans for 626 steps. Expected was 313 steps = 10000//32.

image

If NUM_WORKERS=1, its working as expected .

image

Sharing results.

@s4sarath
Copy link
Author

Any update on this please ?

@s4sarath
Copy link
Author

s4sarath commented Dec 6, 2022

Any update

@amorehead
Copy link
Contributor

amorehead commented Dec 27, 2022

Yes this is expected. Lightning can't know how to shard the data/iterator you provide. You need to make sure your iterator returns half of the data on GPU 0 and the other half on GPU 1. You can do this for example by changing your for loop to something like this (typos expected):

for item in imdb_tokenized['train'][rank::num_gpus]:
    ...

This shards your data. The rank can be accessed for example through trainer.global_rank. If you do this, you need to make sure the iterator returns the same amount of data on each rank (e.g., drop the remainder)

Another way would be to use the DistribuedSampler inside your iterable dataset.

@awaelchli, after trying to use PyTorch's DistributedSampler with an IterableDataset in my application, I observed that DistributedSampler raised an error saying it requires each input dataset to have a len() property. Does this match your understanding, given the context of this discussion? If so, how might we use the DistributedSampler to circumvent the original concern in this issue?

@amorehead
Copy link
Contributor

amorehead commented Dec 28, 2022

@awaelchli, I second this issue. I am also having difficulties figuring out the simplest way to enable multi-GPU and multi-dataloader worker support for IterableDatasets when using PyTorch Lightning. All the examples I have worked through so far do not seem to work when considering both of the following cases: (1) num_workers>0 and len(trainer.devices)>0.

Would it be possible to put together a simple PyTorch Lightning example of how one can structure their IterableDataset and PyTorch Lightning DataModule to support the two use cases above?

@awaelchli
Copy link
Contributor

awaelchli commented Dec 29, 2022

Yes, these observations are all expected. This is not special behavior in Lightning, it's just how the IterableDataset and DataLoader are working in PyTorch. In short: When using an iterable dataset, you need to take care of the sampler inside your dataset yourself, and shard/partition the data yourself across workers and devices.

Yes, I can put together an example, but it has to wait a few days until new year.

@s4sarath
Copy link
Author

s4sarath commented Dec 30, 2022 via email

@awaelchli
Copy link
Contributor

awaelchli commented Jan 3, 2023

@s4sarath let's stay on topic.

@s4sarath @amorehead Here is a notebook that explains the difference between the map dataset and iterable dataset with several examples, using dataloader workers and also shows how it behaves across multiple processes. At the bottom, I also show the example with Lightning. I hope this helps your understanding.

@s4sarath
Copy link
Author

s4sarath commented Jan 3, 2023 via email

@gorold
Copy link

gorold commented Feb 10, 2023

Thanks @awaelchli for the detailed notebook, its really helpful! One question I have is, does this only work with PyTorch's inbuilt parallel strategies, or will it work for other strategies like DeepSpeed? Do we need to call a pytorch lightning get_rank/worker_info function which abstracts away the underlying strategy, or does calling the torch function always guarantee we get the correct information regardless of strategy?

@awaelchli
Copy link
Contributor

awaelchli commented Feb 11, 2023

@gorold It should work with deepspeed yes, but probably not with the TPU strategy.

I haven't mentioned it in the notebook, but PyTorch is developing torchdata which will address these issues completely, as it is heavily focusing on performant iterable-style data loading together with DataLoader2. It would eliminate essentially all of the boilerplate code I show in that notebook.

@gorold
Copy link

gorold commented Feb 12, 2023

Thanks a lot!

@senzeyu
Copy link

senzeyu commented Mar 8, 2023

Thanks so much for sharing! @awaelchli #15734 (comment)
Is there an implementation that would work with an iterable dataset of unknown length?

@awaelchli
Copy link
Contributor

With a pure PyTorch Iterable dataset, I don't know how to do that cleanly.
But I think if you define the dataset as a data pipe in torchdata, you should be able to add a sharding filter that can handle that.

@keenjo
Copy link

keenjo commented Mar 22, 2023

@s4sarath let's stay on topic.

@s4sarath @amorehead Here is a notebook that explains the difference between the map dataset and iterable dataset with several examples, using dataloader workers and also shows how it behaves across multiple processes. At the bottom, I also show the example with Lightning. I hope this helps your understanding.

This notebook seems to work in it's most basic form for me but for some reason when I implement this strategy with a batch of tensors (rather than a batch of integers), the distributed sampler doesn't return the tensors in their original form. Instead, I get tensors with totally shuffled numbers from my dataloader. Any idea why this would be the case?

@awaelchli
Copy link
Contributor

@keenjo By default, the distributed sampler shuffles the data. It has an argument DistributedSampler(shuffle=True|False).

@EvanZ
Copy link

EvanZ commented Apr 6, 2023

I don't understand how this code from that Colab notebook actually works:

class DataParallelIterableDataset(IterableDataset):

    def __len__(self):
        # Caveat: When using DistributedSampler, we need to know the number of samples in our dataset!
        # Hence, we need to implement `__len__`.
        return NUM_SAMPLES

    def __iter__(self):
        worker_info = get_worker_info()
        num_workers = worker_info.num_workers if worker_info is not None else 1
        worker_id = worker_info.id if worker_info is not None else 0
        
        world_size = get_world_size()
        process_rank = get_rank()

        sampler = DistributedSampler(self, num_replicas=(num_workers * world_size), rank=(process_rank * num_workers + worker_id), shuffle=False)

        for i in iter(sampler):
            yield i

Where is the data actually coming from in this example?

@EvanZ
Copy link

EvanZ commented Apr 6, 2023

When I add the two lines to get world size and process rank to my __iter__ code, it freezes my script. :(

@awaelchli
Copy link
Contributor

I don't understand how this code from that Colab notebook actually works:

For distributed training, each process will call the dataset independently:

iterator = iter(dataset)
process 0: next(iterator)
process 1: next(iterator)
...

If we don't put any distribute sampling inside of our dataset, each process would get the same samples:
process 0: next(iterator), next(iterator), next(iterator) -> [0, 1, 2]
process 1: next(iterator), next(iterator), next(iterator) -> [0, 1, 2]

This would render data-parallel completely useless. Instead, if we add the distributed sampler, then we make each process will return different data:

process 0: next(iterator), next(iterator), next(iterator) -> [0, 2, 4]
process 1: next(iterator), next(iterator), next(iterator) -> [1, 3, 5]

You can study the output of the notebook cell to see the same thing.

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

What I'm failing to understand is how in practice to pass the rank and world_size to the dataset when that is being created by my DataModule, before the Trainer is created. It seems that for this to work the Trainer is supposed to pass the rank somehow to the dataset. I can't figure out from your example notebook how to do this. When I try to access the rank and/or world_size in my Dataset before the Trainer is created, it either freezes during runtime or says I need to use init_process_group. It would be great to see a full example in the Lightning docs how to use multi-GPU with IterableDataset to make it more clear.

@keenjo
Copy link

keenjo commented Apr 7, 2023

@EvanZ I was also confused about this at first, but then figured it out. The Trainer does not need any information about the data to be instantiated. So I would recommend instantiating the Trainer first, then you can pass the trainer.world_size and trainer.global_rank to your data module without any issues. Hope this helps!

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

One question that I guess seems obvious to you guys but not to me, do I have to explicitly call init_process_group? If so, where should that be done in a typical Lightning script? As far as I can tell you have to pass a rank, but how do you pass a rank if it hasn't been initiated? This is super confusing to me. I was assuming Lightning takes care of these details.

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

@EvanZ I was also confused about this at first, but then figured it out. The Trainer does not need any information about the data to be instantiated. So I would recommend instantiating the Trainer first, then you can pass the trainer.world_size and trainer.global_rank to your data module without any issues. Hope this helps!

Hmm indeed that is helpful (in theory haha). Currently my training script looks like:

        dm = MyIterableDataModule(
            train=args.train,
            valid=args.valid,
            test=args.test,
            vocab_path=args.vocab_path,
            max_length=args.max_length,
            num_workers=args.num_workers,
            batch_size=args.batch_size,
            label_name=args.label,
            exposure_name=args.exposure
        )
        dm.setup(stage='fit')

        model = MyLightningModel(
            path=args.pretrained_model_path,
            use_exposure_as_weights=args.weights,
            num_tokens=len(vocab),
            dim_model=args.embed_dims,
            dim_h=args.hidden_dims,
            num_layers=args.num_layers,
            num_heads=args.num_heads,
            lr=args.lr
        )

        trainer = pl.Trainer(
            fast_dev_run=args.fast,
            devices=args.gpus,
            accelerator=args.acc,
            callbacks=[early_stopping_cb,
                       checkpoint_cb,
                       TQDMProgressBar(refresh_rate=1)],
            precision=args.precision,
            strategy='ddp',
            reload_dataloaders_every_n_epochs=1,
            limit_train_batches=train_batches,
            limit_val_batches=valid_batches,
            limit_test_batches=test_batches,
            auto_lr_find=True,
            accumulate_grad_batches=args.acc_grads
        )

You're saying if I just flip the Trainer before the DataModule then I will be able to access the rank and the world size inside the Dataset?

@keenjo
Copy link

keenjo commented Apr 7, 2023

Yes, that's exactly it!

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

Ok...maybe that's the missing detail I needed. I'll work on it some more!

@keenjo
Copy link

keenjo commented Apr 7, 2023

I should also mention that I used a different strategy to solve this problem in the end using itertools.islice to avoid repeating data and my Iterable Dataset ended up looking like this:

class CustomDataset(IterableDataset):
    def __init__(self, tokenizer, filepath, rank, world_size, stage='train'):
        super().__init__()
        self.tokenizer = tokenizer
        self.filepath = filepath
        self.stage = stage
        self.rank = rank
        self.world_size = world_size
    
    def __iter__(self):
        assert self.stage in ['train', 'val', 'test']
        worker_info = torch.utils.data.get_worker_info()
        if worker_info is not None:
            num_workers = worker_info.num_workers
            worker_id = torch.utils.data.get_worker_info().id
            world_size = self.world_size
            rank = self.rank
        if self.stage == 'train':
            train_iter_source = open(f'{self.filepath}/train.source')
            train_iter_target = open(f'{self.filepath}/train.target')
            train_set = zip(train_iter_source, train_iter_target)
            mapped_itr = map(self.no_newlines, train_set)
            tok_itr = map(self.tokenize_inputs, mapped_itr)
        elif self.stage == 'val':
            val_iter_source = open(f'{self.filepath}/val.source')
            val_iter_target = open(f'{self.filepath}/val.target')
            val_set = zip(val_iter_source, val_iter_target)
            mapped_itr = map(self.no_newlines, val_set)
            tok_itr = map(self.tokenize_inputs, mapped_itr)
        elif self.stage == 'test':
            test_iter_source = open(f'{self.filepath}/test_both.source')
            test_iter_target = open(f'{self.filepath}/test_both.target')
            test_set = zip(test_iter_source, test_iter_target)
            mapped_itr = map(self.no_newlines, test_set)
            tok_itr = map(self.tokenize_inputs, mapped_itr)

        if worker_info is not None:
            if rank == 0:
                tok_itr = itertools.islice(tok_itr, worker_id, None, (num_workers * world_size))
            else:
                tok_itr = itertools.islice(tok_itr, worker_id + (num_workers * rank), None, (num_workers * world_size))

        return tok_itr
        
    def no_newlines(self, lines):
        '''
        Function to take new lines out of inputs
        '''
        lines = list(lines)

        for idx, line in enumerate(lines):
            lines[idx] = line.strip('\n')

        return lines

    def tokenize_inputs(self, lines):
        '''
        Function to tokenize a batch of lines that are read
        '''
        lines_tok = self.tokenizer.batch_encode_plus(lines,
                                                    return_special_tokens_mask=False,
                                                    add_special_tokens=False)['input_ids']
        return lines_tok

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

Hmm that's interesting and a different organization than I use. I define {train/val/test}_dataloader inside my DataModule. I do currently use islice as well though like this:

    def __iter__(self) -> Iterator[dict]:
        worker_total_num = torch.utils.data.get_worker_info().num_workers
        worker_id = torch.utils.data.get_worker_info().id
        for file in self.files:
            if self.compressed:
                fopen = BZ2File(filename=file, mode='r')
            else:
                fopen = open(file=file, mode='r')

            with fopen as f:
                for row in islice(f,worker_id,None,worker_total_num):
                    data = json.loads(row)
                    tokens = data['words']
                    random.shuffle(tokens)
                    tokens = tokens[:self.max_length]
                    indices, mask = self.tokens_to_indices(tokens)

                    item = {
                        'src': indices,
                        'mask': mask,
                        'label': data['label']
                    }

                    yield item

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

@keenjo Where do you pass in the rank and world size from? I assume you have a custom DataModule class somewhere?

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

If I try to use get_rank in the DataModule class, I get the following error:

Default process group has not been initialized, please make sure to call init_process_group

Even though I have instantiated the Trainer before the DataModule...Do I need to call init_process_group explicitly as well?

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

Basically anywhere in my script I try to call get_rank or get_world_size I get errors telling me to use init_process_group, but how can you initialize the process group if you don't have the rank? It seems like a chicken and egg scenario. What am I missing here?

@keenjo
Copy link

keenjo commented Apr 7, 2023

I have a main training script which is where I get the initial rank and world size from and pass them to my DataModule:

    # Instantiate the trainer
    trainer = Trainer(accelerator='gpu',
                      devices=n_gpu,
                      precision=16,
                      val_check_interval=args.val_check_interval,
                      strategy='deepspeed_stage_2',
                      # logger=logger,
                      max_epochs=args.max_train_epochs,
                      callbacks=[checkpoint_callback,
                                 #grad_accumulation,
                                 lr_monitor,
                                 early_stop_callback,
                                 cb.TQDMProgressBar(),
                                 pred_writer])
    
    rank = trainer.global_rank
    world_size = trainer.world_size

    # Instantiate the data collator and data module
    collate_fn = DataCollatorForSeq2SeqWithMaskingAndPadding(tokenizer=tok, max_length=args.max_length, padding=True)
    dataset = LitRDF2TextDataModule(tokenizer=tok, train_batch_size=args.train_batch_size,
                                    eval_batch_size=args.eval_batch_size, collate_fn=collate_fn, 
                                    data_path=args.data_path, rank=rank, world_size=world_size, buffer_size=args.buffer_size)

Then within my DataModule I pass the rank and world size to the Iterable Dataset that I posted earlier:

# Pytorch Lightning Data Module
class LitRDF2TextDataModule(pl.LightningDataModule):
    """
    Class to prepare data and split it into Dataloaders
    - Look into Pytorch Lightning 1.9.4 Documentation for more information:
        - https://lightning.ai/docs/stable/
    """

    def __init__(self, tokenizer, train_batch_size, eval_batch_size, data_path, collate_fn, 
                 rank, world_size, buffer_size):
        super().__init__()
        self.tokenizer = tokenizer  # tokenizer to use on the dataset
        self.data_path = data_path  # path to dataset
        self.train_batch_size = train_batch_size  # train batch size
        self.eval_batch_size = eval_batch_size  # eval batch size
        self.collate_fn = collate_fn
        self.rank = rank
        self.world_size = world_size
        self.buffer_size = buffer_size


    def setup(self, stage):
        '''
        Method to prepare data to be passed to dataloaders
            - Specifically this creates a pytorch Dataset object for each split of the data
              and prepares the data for masking
        '''
        print(f'Preparing {stage} data...')
        if stage == 'fit' or stage == 'validation':
            self.dataset_train = CustomDataset(tokenizer=self.tokenizer,
                                               filepath=self.data_path,
                                               rank=self.rank,
                                               world_size=self.world_size,
                                               stage='train')
...
...

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

I'm confused. I thought the rank refers to each GPU. How does that work if you are calling it in the main script? How does it ever change? Are you sure your model is utilizing all the GPUs?

@keenjo
Copy link

keenjo commented Apr 7, 2023

I've ensured that my model is using all of the GPUs by printing out the ranks during various processes. Plus, before I solved my problem I had the issue of my data being duplicated across all of the GPUs. I'm not exactly sure what could be causing your error, but just in case there is incompatibility between the lightning versions we are working with, I'm using pytorch-lightning version 1.9.4

@EvanZ
Copy link

EvanZ commented Apr 7, 2023

I am using 1.9.4 as well.

It boggles my mind. The rank is 0 in the main script. You are passing that to your dataset. How can it ever change? I just don't get the logic.

@EvanZ
Copy link

EvanZ commented Apr 12, 2023

I finally got everything to "work" in the sense that I can see batches of data being sent to each GPU and the results appear to be similar to what I was getting before using just 1 GPU...only problem is I am not actually seeing any speedup. It seems all this effort was for not. :/

@YiandLi
Copy link

YiandLi commented Jul 25, 2023

class DataParallelIterableDataset(IterableDataset):

    def __len__(self):
        # Caveat: When using DistributedSampler, we need to know the number of samples in our dataset!
        # Hence, we need to implement `__len__`.
        return NUM_SAMPLES

    def __iter__(self):
        worker_info = get_worker_info()
        num_workers = worker_info.num_workers if worker_info is not None else 1
        worker_id = worker_info.id if worker_info is not None else 0
        
        world_size = get_world_size()
        process_rank = get_rank()

        sampler = DistributedSampler(self, num_replicas=(num_workers * world_size), rank=(process_rank * num_workers + worker_id), shuffle=False)

        for i in iter(sampler):
            yield i

I want to ask if I want to read a file line by line using this class, and I initialize one instance using method

class MyIterableDataset(IterableDataset):
    """
    https://colab.research.google.com/drive/1OFLZnX9y5QUFNONuvFsxOizq4M-tFvk-?usp=sharing
    https://github.com/Lightning-AI/lightning/issues/15734
    """
    
    def __init__(self, path, args, encoder_tokenizer, ent_tokenizer, rel_tokenizer, ins_num,  mode):
        
        super(MyIterableDataset, self).__init__()
        self.path = path
        self.args = args
        self.mode = mode
        self.encoder_tokenizer = encoder_tokenizer
        self.ent_tokenizer = ent_tokenizer
        self.rel_tokenizer = rel_tokenizer
        self.ins_num = ins_num

Where can I put the data path self.path = path into the iter() func ?

In other words, how to replace the NUM_SAMPLES attribute in the given example with a file reader object .

@denisbeslic
Copy link

I’m also having difficulty understanding how the rank can change when it’s initially set to 0 in the main script and then passed to the dataset.

It boggles my mind. The rank is 0 in the main script. You are passing that to your dataset. How can it ever change? I just don't get the logic.

@EvanZ did you ever find an answer to your question? I’m facing a similar issue and would really appreciate some insight.

In the script @awaelchli provided, this seems to work in the non-Lightning version with:

  • mp.start_processes(fixed_data_parallel_with_iterable_dataset, nprocs=NUM_PROCESSES, start_method="fork")
  • init_process(rank).

However, I’m struggling to understand how to achieve the same behavior in the Lightning version, where mp.start_processes and init_process don’t seem to be utilized.

@gil2rok
Copy link

gil2rok commented Sep 4, 2024

I'm also quite confused by this. I hoped PyTorch Lightning would provide more clear guidance + examples (though the notebook is an excellent start!).

Would using worker_init_fn() help better organize the code for a DataParallelIterableDataset()?

See how it is used in the PyTorch documentation: https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset

@gil2rok
Copy link

gil2rok commented Sep 4, 2024

A couple quick questions about the DataParallelIterableDataset() function provided in the above notebook:

class DataParallelIterableDataset(IterableDataset):

    def __len__(self):
        # Caveat: When using DistributedSampler, we need to know the number of samples in our dataset!
        # Hence, we need to implement `__len__`.
        return NUM_SAMPLES

    def __iter__(self):
        worker_info = torch.utils.data.get_worker_info()
        num_workers = worker_info.num_workers if worker_info is not None else 1
        worker_id = worker_info.id if worker_info is not None else 0
        
        world_size = torch.distributed.get_world_size()
        process_rank = torch.distributed.get_rank()

        sampler = torch.utils.data.DistributedSampler(
            self, 
            num_replicas=num_workers * world_size, 
            rank=process_rank * num_workers + worker_id, 
            shuffle=False
        )

        for i in iter(sampler):
            yield I
  1. Why is there no __init__() class?
  2. When we are dealing with real data, where + how do we pass this in? Does it need to get passed to the DistributedSampler() somehow via self?
  3. When training LLMs, how should we incorporate the block_size (context length) into this dataset code? Does this change how we want to distribute the data among processes and GPUs?

@gdoras
Copy link

gdoras commented Oct 31, 2024

There are 2 things that finally made this working for me:

  1. DistributedSampler is not a good candidate for iterable datasets, as it requires the length of the dataset (which is not always possible in practice for a IterableDataset), as explained here:

Neither sampler nor batch_sampler is compatible with iterable-style datasets, since such datasets have no notion
of a key or an index.

Implement instead the sharding logic directly into a custom IterableDataset.

  1. Pass use_distributed_sampler=False to the Lightning Trainer. This is crucial, as the sharding is already taken care of.
from torch.utils.data import IterableDataset
import torch.distributed as tdi

class ToyIterableDataset(IterableDataset):
 
    def __init__(self, filepath):
        self.data = np.load(filepath)

    def __iter__(self):

        # devices split
        device_rank, num_devices = (tdi.get_rank(), tdi.get_world_size()) if tdi.is_initialized() else (0, 1)

        # workers split
        worker_info = get_worker_info()
        worker_rank, num_workers = (worker_info.id, worker_info.num_workers) if worker_info else (0, 1)

        # total (devices + workers) split by device, then by worker
        num_replicas = num_workers * num_devices
        replica_rank = worker_rank * num_devices + device_rank
        # by worker, then device would be:
        # rank = device_rank * num_workers + worker_rank

        for i, data in enumerate(self.data):
            if i % num_replicas == replica_rank:
                #print(f"Device: {device_rank}, worker {worker_rank} fetches sample {i}")
                yield i, data
            else:
                continue

    def __getitem__(self, index):
        raise RuntimeError('__getitem__ shall not be called on iterable dataset.')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests