How to combine two different samplers? MNIST

I am doing distributed training with the mnist dataset. The mnist dataset is only split (by default) between training and testing set. I would like to split the training set in training and validation set.

I could do this as follows:

# Shuffle the indices
indices = np.arange(60000)
np.random.shuffle(indices)

# Build the train loader
train_loader = torch.utils.data.DataLoader(datasets.MNIST('mnist', download=True, train=True,
                     transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])),
                     batch_size=64, shuffle=False, sampler=torch.utils.data.SubsetRandomSampler(indices[:55000]))

# Build the validation loader
val_loader = torch.utils.data.DataLoader(datasets.MNIST('mnist', download=True, train=True,
                   transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])),
                   batch_size=64, shuffle=False, sampler=torch.utils.data.SubsetRandomSampler(indices[55000:]))

but how do I incorporate the distributed training? Before (without split) I was doing it as follows:

# gets data from training-dir (s3 bucket path)
def _get_train_data_loader(batch_size, training_dir, is_distributed, **kwargs):
    logger.info("Get train data loader")
      
    dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]))
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None
    return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None,
                                       sampler=train_sampler, **kwargs)

The problem is that I do not know how to pass 2 samplers to do both things when creating the dataloader.

I found this https://github.com/pytorch/pytorch/issues/23430 which seems to be related but since I am a beginner I am not really able to make sense of it.

1 Like

The cleanest approach would probably be to create a DistributedSubsetSampler, if you want to combine these both samplers together.
However, I think the easiest workaround would be to use DistributedSampler and wrap your Dataset in Subset using the desired indices.

2 Likes

Thanks a lot @ptrblck for your awesome help! I did not know of Subset which totally did the trick in one line. I am adding below the solution for future reference.

def _get_train_data_loader(batch_size, training_dir, is_distributed, indices, **kwargs):
    logger.info("Get train data loader")
    
    # Train = True, indices[:55000]
    dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]))
    
    train_ds  = torch.utils.data.Subset(dataset, indices[:55000])
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_ds) if is_distributed else None
    train_loader = torch.utils.data.DataLoader(train_ds, batch_size=batch_size, shuffle=train_sampler is None, sampler=train_sampler, **kwargs)
    return train_loader

def _get_valid_data_loader(valid_batch_size, training_dir, indices, **kwargs):
    logger.info("Get valid data loader")
    
    # train = True, indices[55000:]
    dataset = datasets.MNIST(training_dir, train=True, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]))
    
    valid_ds  = torch.utils.data.Subset(dataset, indices[55000:])
    valid_loader = torch.utils.data.DataLoader(valid_ds, batch_size=valid_batch_size, shuffle=True, **kwargs)
    return valid_loader

def _get_test_data_loader(test_batch_size, training_dir, **kwargs):
    logger.info("Get test data loader")
    
    # train = False
    dataset = datasets.MNIST(training_dir, train=False, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]))
    
    test_loader= torch.utils.data.DataLoader(dataset, batch_size=test_batch_size, shuffle=True, **kwargs)
    return test_loader 

Note that I am calling these function from the train function like this:

    indices = np.arange(60000)
    np.random.shuffle(indices)    
    train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, indices, **kwargs)
    valid_loader = _get_valid_data_loader(args.valid_batch_size, args.data_dir, indices, **kwargs)
    test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs)```
1 Like

Hi @ptrblck
How will I create the DistributedSubsetSampler?
I have to combine WeightedRandomSampler and DistributedSampler for batch balancing with ddp training.

This post and thread discusses the use case of combining both samplers so you could take a look at some of the posted implementations.