I want to use my custom sampler (for example, I need oversampling and I want to use this repo: https://github.com/ufoym/imbalanced-dataset-sampler), but I already use DistributedSampler
for DataLoader
, because I use multi-gpu training. How can I pass to DataLoader
one more sampler or maybe I can do it using Dataset
? Currently, I use pretty simple ImageFolder
dataset and it would be cool if I didn’t need to rewrite it.
You can implement a Wrapper class for your dataset and do the sampling there. For example, if you were to combine DistributedSampler with SubsetRandomSampler, you can implement a dataset wrapper like this:
class DistributedIndicesWrapper(torch.utils.data.Dataset):
"""
Utility wrapper so that torch.utils.data.distributed.DistributedSampler can work with train test splits
"""
def __init__(self, dataset: torch.utils.data.Dataset, indices: torch.Tensor):
self.dataset = dataset
self.indices = indices
def __len__(self):
return self.indices.size(0)
def __getitem__(self, item):
# TODO: do the sampling here ?
idx = self.indices[item]
return self.dataset[idx]
Thanks for idea, danielhavir!
For everyone who is looking for oversampling wrapper under FolderDataset
, you can look at this:
class OversamplingWrapper(torch.utils.data.Dataset):
def __init__(self, folder_dataset, oversampling_size=1000):
self.folder_dataset = folder_dataset
self.oversampling_size = oversampling_size
self.num_classes = len(folder_dataset.classes)
self.class_idx_to_sample_ids = {i: [] for i in range(self.num_classes)}
for idx, (_, class_id) in enumerate(folder_dataset.samples):
self.class_idx_to_sample_ids[class_id].append(idx)
def __len__(self):
return self.num_classes * self.oversampling_size
def __getitem__(self, index):
class_id = index % self.num_classes
sample_idx = random.sample(self.class_idx_to_sample_ids[class_id], 1)
return self.folder_dataset[sample_idx[0]]
Hi,
I’ve got a similar goal for distributed training only with WeightedRandomSampler and a custom torch.utils.data.Dataset .
I have 2 classes, positive (say 100) and negative (say 1000).
Each epoch, I want all positive examples, and an equal number of random negative samples.
ds = custom_dataset(args)
weights = 1. /torch.tensor([ds.n_positive, ds.n_negative], dtype=torch.float)
samples_weights = weights[ds.all_targets]
WRsampler = WeightedRandomSampler(
weights=samples_weights,
num_samples=len(samples_weights),
replacement=True
)
But I can’t figure this out.
If I want random negative samples regardless of batch_size, wouldn’t I need a dataloader wrapper? How would I go about a dataloader wrapper?
Any hints or suggestions, much appreciated.
Help us @ptrblck, you’re our only hope. (and have you considered running for president 2020?)
Are you using nn.DistributedDataParallel
as shown in this tutorial?
If so, I assume you are using a DistributedSampler
to only use a valid subset of your dataset in each process?
In that case we should be able to add weighted sampling into the sampler, but let me know, if my assumptions are correct before diving into it.
Hahaha, president of discuss-land?
I’ve been using pytorch lightning with the ‘ddp’ distributed data parallel backend and torch.utils.data.distributed.DistributedSampler(ds)
as the DataLoader sampler argument. To be honest, I’m unsure of the subsetting that this represents, despite having a look at the source code, but happy to learn. Also happy to refactor for a clean, robust solution. Cheers
I’m not familiar with lightning, but I assume it’s just using the torch.utils.data.DistributedSampler
.
Based on the implementation of DistributedSampler
and WeightedRandomSampler
, this code might work:
class DistributedWeightedSampler(Sampler):
def __init__(self, dataset, num_replicas=None, rank=None, replacement=True):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
self.total_size = self.num_samples * self.num_replicas
self.replacement = replacement
def calculate_weights(self, targets):
class_sample_count = torch.tensor(
[(targets == t).sum() for t in torch.unique(targets, sorted=True)])
weight = 1. / class_sample_count.double()
samples_weight = torch.tensor([weight[t] for t in targets])
return samples_weight
def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
if self.shuffle:
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
# add extra samples to make it evenly divisible
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
# get targets (you can alternatively pass them in __init__, if this op is expensive)
targets = self.dataset.targets
targets = targets[self.rank:self.total_size:self.num_replicas]
assert len(targets) == self.num_samples
weights = self.calculate_weights(targets)
return iter(torch.multinomial(weights, self.num_samples, self.replacement).tollist())
def __len__(self):
return self.num_samples
def set_epoch(self, epoch):
self.epoch = epoch
This DistributedWeightedSampler
will get the targets
of your dataset, create the weights for the current split, and use torch.multinomial
to sample from these samples as is done in the WeightedRandomSampler
.
This code is untested and I just hacked it together, so please let me know, if this would work at all or if you are seeing any issues.
Above and beyond, as usual. Just need to verify whats doing through but pretty sure this has done the trick. Thanks a million!
I have a slightly different but related question here. Is it possible to have a SequentialSampler followed by a DistributedSampler? I am not sure if this would work when using multi GPUs as data could have been split randomly already.
The reason I am asking this question is that I would like to create a single dataloader from multiple data sources, and I would like each mini-batch of the dataloader to contain only one kind of data. This can easily be done if I create one dataloader for every single data source (and when training go through each of them one by one), but for my purpose here I am wondering if something similar can be achieved by only using one dataloader for all data sources.
Thanks again. This is working great but it seems to be responsible for processes hanging / getting stuck on GPU when main script is terminated or ‘early-stopped’.
~python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 18 leaked semaphores to clean up at shutdown
Any hints as to how this can be cleaned up?
Cheers
Which code hangs or yields the semaphore warning?
Is is the DistributedWeightedSampler
?
If the main training script gets early stopped or I keyboard interrupt it, my nvidia-smi memory usage on one of my (two) gpus stays almost full and VGPU stays at 100%. semaphore warning is after pkill python
. Doesn’t seem to happen if I’m using any other sampler.
Thanks for the notice. As I haven’t tested the code, it might yield these side effects.
Could you take a look at this issue and see, it this approach would better fit your needs?
Cheers. Pretty sure this was a rookie error, forgot a torch.no_grad() for my val loop. Hasn’t been an issue since adding that in. Thanks.
Superb, thanks very much for this @ptrblck.
Maybe I’m missing something here, but in the __iter__ function, shouldn’t
targets = targets[indices]
or similar, rather than what we currently have:
targets = targets[self.rank:self.total_size:self.num_replicas]
otherwise don’t we just leave indicies hanging in the breeze and not doing anything? Just realised we still need a way to map the selected targets back to the original dataset indicies. I will post when I have something…
thanks again for your awesomeness
Yes, you are probably right. indices
has already the extra samples and also is already subsampled, so that it should be used instead of indexing the targets
directly.
I’ll observe the linked issue on GitHub, as it should provide a cleaner way of implementing this behavior.
thanks, I just looked at linked issue, and agree that it is exactly what I was looking for. It looks like it is not ready yet. so here is what I will use in the meantime …
def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
if self.shuffle:
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
# add extra samples to make it evenly divisible
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
# get targets (you can alternatively pass them in __init__, if this op is expensive)
targets = self.dataset.targets
# select only the wanted targets for this subsample
targets = torch.tensor(targets)[indices]
assert len(targets) == self.num_samples
# randomly sample this subset, producing balanced classes
weights = self.calculate_weights(targets)
subsample_balanced_indicies = torch.multinomial(weights, self.num_samples, self.replacement)
# now map these target indicies back to the original dataset index...
dataset_indices = torch.tensor(indices)[subsample_balanced_indicies]
return iter(dataset_indices.tolist())
Interesting thread.
If we want to do a simple random sampler, wouldn’t something like this work? We just take a random sample of our whole dataset on point of dataset creation, than wrap our Dataset class in DistributedSampler, which would take care of splitting it among processes?
We make reload of the Dataset every epoch so a new sample is drawn.
class Dataset():
def __init__(self, seed, sample_size, *args, **kwargs):
random.seed(seed)
random.shuffle(self.ids)
self.ids = self.ids[:sample_size]
dataset = Dataset()
dataset = torch.utils.data.distributed.DistributedSampler(dataset)
This Dataset
would sample the same data points in each process wouldn’t it?
The original DistributedSampler
will split the indices such that each process would draw its own samples.