How to use my own sampler when I already use DistributedSampler?

I want to use my custom sampler (for example, I need oversampling and I want to use this repo: https://github.com/ufoym/imbalanced-dataset-sampler), but I already use DistributedSampler for DataLoader, because I use multi-gpu training. How can I pass to DataLoader one more sampler or maybe I can do it using Dataset? Currently, I use pretty simple ImageFolder dataset and it would be cool if I didn’t need to rewrite it.

6 Likes

You can implement a Wrapper class for your dataset and do the sampling there. For example, if you were to combine DistributedSampler with SubsetRandomSampler, you can implement a dataset wrapper like this:

class DistributedIndicesWrapper(torch.utils.data.Dataset):
    """
    Utility wrapper so that torch.utils.data.distributed.DistributedSampler can work with train test splits
    """
    def __init__(self, dataset: torch.utils.data.Dataset, indices: torch.Tensor):
        self.dataset = dataset
        self.indices = indices

    def __len__(self):
        return self.indices.size(0)

    def __getitem__(self, item):
        # TODO: do the sampling here ?
        idx = self.indices[item]
        return self.dataset[idx]
1 Like

Thanks for idea, danielhavir!

For everyone who is looking for oversampling wrapper under FolderDataset, you can look at this:

class OversamplingWrapper(torch.utils.data.Dataset):
    def __init__(self, folder_dataset, oversampling_size=1000):
        self.folder_dataset = folder_dataset
        self.oversampling_size = oversampling_size
        self.num_classes = len(folder_dataset.classes)

        self.class_idx_to_sample_ids = {i: [] for i in range(self.num_classes)}
        for idx, (_, class_id) in enumerate(folder_dataset.samples):
            self.class_idx_to_sample_ids[class_id].append(idx)

    def __len__(self):
        return self.num_classes * self.oversampling_size

    def __getitem__(self, index):
        class_id = index % self.num_classes
        sample_idx = random.sample(self.class_idx_to_sample_ids[class_id], 1)
        return self.folder_dataset[sample_idx[0]]
3 Likes

Hi,

I’ve got a similar goal for distributed training only with WeightedRandomSampler and a custom torch.utils.data.Dataset .
I have 2 classes, positive (say 100) and negative (say 1000).
Each epoch, I want all positive examples, and an equal number of random negative samples.

ds = custom_dataset(args)
 
weights = 1. /torch.tensor([ds.n_positive, ds.n_negative], dtype=torch.float)
samples_weights = weights[ds.all_targets]
WRsampler = WeightedRandomSampler(
        weights=samples_weights,
        num_samples=len(samples_weights),
        replacement=True
        )

But I can’t figure this out.
If I want random negative samples regardless of batch_size, wouldn’t I need a dataloader wrapper? How would I go about a dataloader wrapper?
Any hints or suggestions, much appreciated.

1 Like

Help us @ptrblck, you’re our only hope. (and have you considered running for president 2020?)

3 Likes

Are you using nn.DistributedDataParallel as shown in this tutorial?
If so, I assume you are using a DistributedSampler to only use a valid subset of your dataset in each process?

In that case we should be able to add weighted sampling into the sampler, but let me know, if my assumptions are correct before diving into it. :wink:

Hahaha, president of discuss-land? :smiley:

2 Likes

I’ve been using pytorch lightning with the ‘ddp’ distributed data parallel backend and torch.utils.data.distributed.DistributedSampler(ds) as the DataLoader sampler argument. To be honest, I’m unsure of the subsetting that this represents, despite having a look at the source code, but happy to learn. Also happy to refactor for a clean, robust solution. Cheers

I’m not familiar with lightning, but I assume it’s just using the torch.utils.data.DistributedSampler.
Based on the implementation of DistributedSampler and WeightedRandomSampler, this code might work:

class DistributedWeightedSampler(Sampler):
    def __init__(self, dataset, num_replicas=None, rank=None, replacement=True):
        if num_replicas is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            num_replicas = dist.get_world_size()
        if rank is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            rank = dist.get_rank()
        self.dataset = dataset
        self.num_replicas = num_replicas
        self.rank = rank
        self.epoch = 0
        self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
        self.total_size = self.num_samples * self.num_replicas
        self.replacement = replacement


    def calculate_weights(self, targets):
        class_sample_count = torch.tensor(
            [(targets == t).sum() for t in torch.unique(targets, sorted=True)])
        weight = 1. / class_sample_count.double()
        samples_weight = torch.tensor([weight[t] for t in targets])
        return samples_weight

    def __iter__(self):
        # deterministically shuffle based on epoch
        g = torch.Generator()
        g.manual_seed(self.epoch)
        if self.shuffle:
            indices = torch.randperm(len(self.dataset), generator=g).tolist()
        else:
            indices = list(range(len(self.dataset)))

        # add extra samples to make it evenly divisible
        indices += indices[:(self.total_size - len(indices))]
        assert len(indices) == self.total_size

        # subsample
        indices = indices[self.rank:self.total_size:self.num_replicas]
        assert len(indices) == self.num_samples

        # get targets (you can alternatively pass them in __init__, if this op is expensive)
        targets = self.dataset.targets
        targets = targets[self.rank:self.total_size:self.num_replicas]
        assert len(targets) == self.num_samples
        weights = self.calculate_weights(targets)

        return iter(torch.multinomial(weights, self.num_samples, self.replacement).tollist())

    def __len__(self):
        return self.num_samples

    def set_epoch(self, epoch):
        self.epoch = epoch

This DistributedWeightedSampler will get the targets of your dataset, create the weights for the current split, and use torch.multinomial to sample from these samples as is done in the WeightedRandomSampler.
This code is untested and I just hacked it together, so please let me know, if this would work at all or if you are seeing any issues.

6 Likes

Above and beyond, as usual. Just need to verify whats doing through but pretty sure this has done the trick. Thanks a million!

1 Like

I have a slightly different but related question here. Is it possible to have a SequentialSampler followed by a DistributedSampler? I am not sure if this would work when using multi GPUs as data could have been split randomly already.

The reason I am asking this question is that I would like to create a single dataloader from multiple data sources, and I would like each mini-batch of the dataloader to contain only one kind of data. This can easily be done if I create one dataloader for every single data source (and when training go through each of them one by one), but for my purpose here I am wondering if something similar can be achieved by only using one dataloader for all data sources.

Thanks again. This is working great but it seems to be responsible for processes hanging / getting stuck on GPU when main script is terminated or ‘early-stopped’.

~python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 18 leaked semaphores to clean up at shutdown

Any hints as to how this can be cleaned up?
Cheers

Which code hangs or yields the semaphore warning?
Is is the DistributedWeightedSampler?

If the main training script gets early stopped or I keyboard interrupt it, my nvidia-smi memory usage on one of my (two) gpus stays almost full and VGPU stays at 100%. semaphore warning is after pkill python. Doesn’t seem to happen if I’m using any other sampler.

Thanks for the notice. As I haven’t tested the code, it might yield these side effects.
Could you take a look at this issue and see, it this approach would better fit your needs?

Cheers. Pretty sure this was a rookie error, forgot a torch.no_grad() for my val loop. Hasn’t been an issue since adding that in. Thanks.

Superb, thanks very much for this @ptrblck.
Maybe I’m missing something here, but in the __iter__ function, shouldn’t
targets = targets[indices]
or similar, rather than what we currently have:
targets = targets[self.rank:self.total_size:self.num_replicas]
otherwise don’t we just leave indicies hanging in the breeze and not doing anything? Just realised we still need a way to map the selected targets back to the original dataset indicies. I will post when I have something…
thanks again for your awesomeness

Yes, you are probably right. indices has already the extra samples and also is already subsampled, so that it should be used instead of indexing the targets directly.

I’ll observe the linked issue on GitHub, as it should provide a cleaner way of implementing this behavior. :wink:

thanks, I just looked at linked issue, and agree that it is exactly what I was looking for. It looks like it is not ready yet. so here is what I will use in the meantime …

def __iter__(self):
    # deterministically shuffle based on epoch
    g = torch.Generator()
    g.manual_seed(self.epoch)
    if self.shuffle:
        indices = torch.randperm(len(self.dataset), generator=g).tolist()
    else:
        indices = list(range(len(self.dataset)))

    # add extra samples to make it evenly divisible
    indices += indices[:(self.total_size - len(indices))]
    assert len(indices) == self.total_size

    # subsample
    indices = indices[self.rank:self.total_size:self.num_replicas]
    assert len(indices) == self.num_samples

    # get targets (you can alternatively pass them in __init__, if this op is expensive)
    targets = self.dataset.targets
    # select only the wanted targets for this subsample
    targets = torch.tensor(targets)[indices]
    assert len(targets) == self.num_samples
    # randomly sample this subset, producing balanced classes
    weights = self.calculate_weights(targets)
    subsample_balanced_indicies = torch.multinomial(weights, self.num_samples, self.replacement)
    # now map these target indicies back to the original dataset index...
    dataset_indices = torch.tensor(indices)[subsample_balanced_indicies]

    return iter(dataset_indices.tolist())
2 Likes

Interesting thread.
If we want to do a simple random sampler, wouldn’t something like this work? We just take a random sample of our whole dataset on point of dataset creation, than wrap our Dataset class in DistributedSampler, which would take care of splitting it among processes?
We make reload of the Dataset every epoch so a new sample is drawn.

class Dataset():
    def __init__(self, seed, sample_size, *args, **kwargs):
        random.seed(seed)
        random.shuffle(self.ids)
        self.ids = self.ids[:sample_size]

dataset = Dataset()
dataset = torch.utils.data.distributed.DistributedSampler(dataset)

This Dataset would sample the same data points in each process wouldn’t it?
The original DistributedSampler will split the indices such that each process would draw its own samples.