Distributed Data Sampler

I have been using Speechbrain’s Distributed sampler wrapper :

class DistributedSamplerWrapper(DistributedSampler):
“”“This wrapper allows using any sampler with Distributed Data Parallel (DDP) correctly.
Passing blindly the sampler to each DDP process will cause to have access
within each process to all the data in the dataset instead of only a subset
of it which is unique to each process. This wrapper prevents this and
allows to use only a subset of the original data for each process.
NOTE
----
This is is automatically applied to any sampler in the Brain class when DDP
training is used.
“””

def __init__(self, sampler, *args, **kwargs):
    # DistributedSampler only calls len() on dataset
    # so a sampler is fine to pass there, as well.
    super().__init__(dataset=sampler, *args, **kwargs)
    self.sampler = sampler

def __iter__(self):
    # It is easiest to use a random access interface to the wrapped
    # sampler's indices, so we just fetch all indices from the wrapped
    # sampler
    sampler_indices = list(self.sampler.__iter__())
   print(sampler_indices)
    indices_of_indices = super().__iter__()
    # Itemgetter fetches the wrapped sampler indices from the positions
    # pointed to by DistributedSampler
    return iter(itemgetter(*indices_of_indices)(sampler_indices))

def set_epoch(self, epoch):
    """Pass set_epoch() through to DistributedSampler and the wrapper one"""
    super().set_epoch(epoch)
    if hasattr(self.sampler, "set_epoch"):
        self.sampler.set_epoch(epoch)

######################################
I’m trying to print sampler_indices variable, but it was printing twice- so,what could be the reason for that ?

I guess you might be running in a DistributedDataParallel setup where each process uses a single device. If so, then each process will also initialize an own sampler and the print statement should be visible num_processes times.

When I use 2 device, it’s printing 4 times and 2 times when device is one

How many workers are you using and how many calls are you expecting to __iter__?

As you have rightly mentioned earlier, I’m using a DDP set-up. I have 2 GPU in a single mode.

I’m expecting to call iter once. In my case I will be having 2 process, so each will call iter once and that returns an iterator.

What’s happening is -
For single process,

  1. Variable name gets printed
  2. Iterator is not returned
  3. Variable name gets printed again
  4. Iterator is returned

I feel like I’m probably missing something here, though overall outcome (obtaining different subset of samples for each process) is as expected

Thanks for the update. That’s strange and I cannot reproduce the issue using the native DistributedSampler.
If I add a debug print statement here to print the indices I see nproc_per_node outputs. I.e. using 2 GPUs via --nproc_per_node=2 prints the indices 2 times, --nproc_per_node=8 8 times etc.

1 Like

Thanks for the help.
I will update in this thread, if I’m able to find the reasoning behind this behavior

That sounds good. One debugging step could be to check if you are seeing the expected behavior using the native DistributedSampler. Could you add the print statements into the same line of code in your local installation and rerun your code?

   indices = indices[self.rank:self.total_size:self.num_replicas]
    print(indices)
    assert len(indices) == self.num_samples

I tried as you suggested , indices are getting printed twice for single process.

This is my dataloader
sampler = DistributedSamplerWrapper(DynamicBatch(path)) # Wrapper code I have pasted earlier
DataLoader(dataset,collate_fn=collate_wrapper,batch_sampler=sampler,num_workers=16)