Loading data by pinning and DataParallel not working

Hello everyone,

Could you guys have a look on my problem.

I have two problems: data loading and DataParallel are not working.

To train densenet121 on 4 GPUs (Tesla V100) I use DataParallel. The code I use for these tests is from here as suggested here. I just customised this code. I even tried with smaller batches as suggested in this tutorial.

Here are my results for parallel training:

  • with 4 GPUs, 80 batch and 10 epochs: 7m 24s
  • with 1 GPU, 20 batch and 10 epochs: 5m 26s

Concerning pinning the loaded data to CPU, there is also no changes. In DataLoader I set ‘pin_memory = True’ and in cuda ‘non_blocking = True’.

Thank you for your precious considered time.

Here is my code:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu May 16 15:11:33 2019

"""
import os
import shutil
import time

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models


def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

def cuda_managment(device_count, text = str):
    print('\n')
    print(text)
    for d in range(device_count):
        #print('GPU {} allocated memory {}'.format(d, cuda.memory_allocated(d)/1e+9))
        print('GPU {} cached memory {}'.format(d, torch.cuda.max_memory_cached(d)/1e+9))
    print('\n')
    
     
          
def img_loader(train_or_test, transformers, batch_size, shuffle_data = True):
    # create dataset
    data = []
    for t in transformers:
        data.append(datasets.ImageFolder(os.path.join(data_dir, train_or_test), t))
    data = torch.utils.data.ConcatDataset(data)

    # create dataloader
    loader = torch.utils.data.DataLoader(data, batch_size=batch_size, shuffle=shuffle_data, 
                                         num_workers=20,
                                         pin_memory = True
                                         )
    
    return loader


def main_worker(num_classes, datadir, batch_size, train_loader, val_loader, 
                n_workers = 3, evaluate = False, epochs = 100, feature_extract = False):
    
    
    since = time.time()
    best_acc1 = 0.0
    
    # create model    
    model = models.densenet121(pretrained=False)
    set_parameter_requires_grad(model, feature_extract)
    num_ftrs = model.classifier.in_features
    model.classifier = nn.Linear(num_ftrs, num_classes)
    
    
    #no parallel traing
#    model = model.cuda(0)
    
    # parallel training
    model = nn.DataParallel(model).cuda()

    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().cuda(0)
    optimizer = torch.optim.SGD(model.parameters(), 
                                lr = 0.001,
                                momentum=0.9,
                               )
    
    if evaluate:
        validate(val_loader, model, criterion)
        return

    for epoch in range(epochs):
        cuda_managment(torch.cuda.device_count(), 'CUDA STATE')
        adjust_learning_rate(optimizer, epoch)

        # train for one epoch
        train(train_loader, model, criterion, optimizer, epoch)

        # evaluate on validation set
        acc1 = validate(val_loader, model, criterion)

        # remember best acc@1 and save checkpoint
        is_best = acc1 > best_acc1
        best_acc1 = max(acc1, best_acc1)

       
        save_checkpoint({
            'epoch': epoch + 1,
            'state_dict': model.state_dict(),
            'best_acc1': best_acc1,
            'optimizer' : optimizer.state_dict(),
        }, is_best)
    
    
    time_elapsed = time.time() - since
    print('training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    
    
def train(train_loader, model, criterion, optimizer, epoch):
    batch_time = AverageMeter('Time', ':6.3f')
    data_time = AverageMeter('Data', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    top5 = AverageMeter('Acc@5', ':6.2f')
    progress = ProgressMeter(len(train_loader), batch_time, data_time, losses, top1,
                             top5, prefix="Epoch: [{}]".format(epoch))

    # switch to train mode
    model.train()
    end = time.time()
    
    for i, (input, target) in enumerate(train_loader):
        # measure data loading time
        data_time.update(time.time() - end)
        
        input = input.cuda(0, non_blocking = True)
        target = target.cuda(0, non_blocking=True)
#        input = input.cuda(0)
#        target = target.cuda(0)

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss
        acc1, acc5 = accuracy(output, target, topk=(1, 2))
        losses.update(loss.item(), input.size(0))
        top1.update(acc1[0], input.size(0))
        top5.update(acc5[0], input.size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        progress.print(i)


def validate(val_loader, model, criterion):
    batch_time = AverageMeter('Time', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    top5 = AverageMeter('Acc@5', ':6.2f')
    progress = ProgressMeter(len(val_loader), batch_time, losses, top1, top5,
                             prefix='Test: ')

    # switch to evaluate mode
    model.eval()

    with torch.no_grad():
        end = time.time()
        for i, (input, target) in enumerate(val_loader):
            input = input.cuda(0, non_blocking=True)
            target = target.cuda(0, non_blocking=True)
#            input = input.cuda(0)
#            target = target.cuda(0)
            
            # compute output
            output = model(input)
            loss = criterion(output, target)

            # measure accuracy and record loss
            acc1, acc5 = accuracy(output, target, topk=(1, 2))
            losses.update(loss.item(), input.size(0))
            top1.update(acc1[0], input.size(0))
            top5.update(acc5[0], input.size(0))

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            progress.print(i)

        # TODO: this should also be done with the ProgressMeter
        print(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}'
              .format(top1=top1, top5=top5))

    return top1.avg




def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, 'model_best.pth.tar')


class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
    def __init__(self, num_batches, *meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def print(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'


def adjust_learning_rate(optimizer, epoch):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = 0.001 * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res







data_dir = '/home/bgv/Desktop/cresus ia/data/train_test'
batch_size = 20
num_epochs = 50
n_classes = 4
h = 224
w = 224
input_size = h, w


# normalisation
normalisation = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

# color
color_jitter_scb = transforms.ColorJitter(saturation = (0.2, 2), contrast= (0.2, 2), brightness = (0.3, 2))
color_jitter_s = transforms.ColorJitter(saturation = (0.2, 2))
color_jitter_c = transforms.ColorJitter(contrast= (0.2, 2))
color_jitter_b = transforms.ColorJitter(brightness = (0.3, 2))
color_jitter_random = transforms.RandomChoice([color_jitter_scb, color_jitter_s, color_jitter_c, color_jitter_b])

# rotation
rotation = transforms.RandomRotation(degrees = (-10,10), expand=False)
rotation_expand = transforms.RandomRotation(degrees = (-10,10), expand=True)
rotation_down = transforms.RandomRotation(degrees = (-179,-180), expand=False)
rotation_random = transforms.RandomChoice([rotation, rotation_expand])

original = transforms.Compose([transforms.Resize(input_size), transforms.ToTensor(), normalisation])
color_rotation = transforms.Compose([
                                    rotation_random,
                                    color_jitter_random,
                                    transforms.Resize(input_size),
                                    transforms.ToTensor(),
                                    normalisation
                                    ])

# train
transforms_ = [original, color_rotation]

img_count = 0
load_train = img_loader('train', transforms_, batch_size)
load_test = img_loader('test', transforms_, batch_size, shuffle_data = False)
img_loaders = {'train': load_train, 'test': load_test}

main_worker(n_classes, data_dir, batch_size, load_train, load_test, n_workers = 20, evaluate = False, epochs = num_epochs)



The code looks generally alright.
Have you tried to reduce the number of workers? 20 seems to be quite high (of course it’s depending on your system setup).

Also, could you check if DistributedDataParallel speeds up your training?

Thank you for you response.

I already tested with less number of workers, and to be sure I tested once again after your post. It takes more time with less workers (I set it to 10). My machine has 20 physical cores, it is NVIDIA DGX Station.

I tried:

torch.distributed.init_process_group(backend=“nccl”)
model = torch.nn.parallel.DistributedDataParallel(model)

but I got the following error:

ValueError: Error initializing torch.distributed using env:// rendezvous: environment variable RANK expected, but not set

If I understand it well, DataParallel does the same stuff or it is wrong?

They do the same, but with DataParallel you drive N devices from a single process, whereas with DistributedDataParallel you drive N devices from N processes. For the latter, those devices may be located in different machines, hence the distributed part. You’ll have to launch those N processes though, you cannot start a single process and have it work out of the box. You can start multiple processes manually (and set the RANK environment variable accordingly, per the error message you’re seeing), or use the torch.distributed.launch utility to launch processes for you.