Is my DistributedDataParallel code correct? Why is DistributedDataParallel's performance worse than nn.DataParallel?

Hi,I am new to Pytorch DistributedDataParallel. Object detection, My model can achieve 79mAP with nn.DataParallel, but with DistributedDataParallel, it only 50+mAP.
In both ways, except for batchsize, every parameter is the same. For nn.DataParallel, the batchsize is 32, and 4 GPUs, for DistributedDataParallel, the batchsize is 8 for per GPU, 4 GPUS, So the total number of batchsize is the same. Is my approach correct? Why is DistributedDataParallel performing worse?

Here is .py with nn.DataParallel:

#train.py

import os
os.environ['CUDA_VISIBLE_DEVICES']='0,1,2,3'
import torch
from model_dla34 import get_pose_net
from loss import CtdetLoss,AverageMeter
from torch.utils.data import DataLoader
from datasets import My_certernet_datasets
import time

datasets = 'pascal' 
batch_size = 32
start_epoch = 0
end_epoch = 70
init_lr = 1.25e-4

def main():
    train_data = My_certernet_datasets(mode = 'train',datasets = datasets)
    print('there are {} train images'.format(len(train_data)))
    train_data_loader = DataLoader(dataset=train_data,
                                   num_workers=16,
                                   batch_size=batch_size,
                                   shuffle=True,
                                   pin_memory=True
                                   )
    model = get_pose_net() 
    if torch.cuda.device_count() > 1:
        model = torch.nn.DataParallel(model)
    model = model.to(device)
    criterion = CtdetLoss()
    optimizer = torch.optim.Adam(model.parameters(), init_lr)
    for epoch in range(start_epoch+1,end_epoch+1):
        adjust_lr(optimizer,epoch,init_lr)
        train(train_data_loader,model,criterion,optimizer,device,epoch,end_epoch)
        if epoch % 10 == 0:
            save_model(save_dir + '/model_last.pth',epoch,model,optimizer = None)

def train(train_data_loader,model,criterion,optimizer,device,epoch,end_epoch):
    losses = AverageMeter()
    model = model.train()
    for i ,batch in enumerate(train_data_loader):
        start_time = time.time()
        for k in batch:
            if k != 'meta':
                batch[k] = batch[k].to(device)
        output = model(batch['input'])
        loss_stats = criterion(output,batch)
        loss = loss_stats['loss']
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        end_time = time.time()
        ELA_time = (end_time - start_time)*(end_epoch - epoch)*len(train_data_loader)
        ELA_time = time.strftime('%H:%M:%S',time.gmtime(ELA_time))
        losses.update(loss.item())
        print('[epoch:{},{}/{}]'.format(epoch,i,len(train_data_loader)),'current_loss:%.4f'% losses.current,\
            'average_loss:%.4f' % losses.avg,'ELA_time:',ELA_time)

def adjust_lr(optimizer,epoch,init_lr,lr_step=[45,60]):

    if epoch in lr_step:
        lr = init_lr*(0.1**(lr_step.index(epoch) + 1))
        print('Drop LR to',lr)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

def save_model(path,epoch,model,optimizer = None):
    if isinstance(model,torch.nn.DataParallel):
        state_dict = model.module.state_dict()
    else:
        state_dict = model.state_dict()
    data = {'epoch':epoch,'state_dict':state_dict}
    if not (optimizer is None):
        data['optimizer'] = optimizer.state_dict()
    torch.save(data,path)

if __name__ == '__main__':
    main()

Here is .py with DistributedDataParallel:

#train_DDP.py

import os
os.environ['CUDA_VISIBLE_DEVICES']='0,1,2,3'
import torch
from model_dla34 import get_pose_net
from loss import CtdetLoss,AverageMeter
from torch.utils.data import DataLoader
from datasets import My_certernet_datasets
import time
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel

torch.distributed.init_process_group(backend='nccl')
datasets = 'pascal' 
batch_size = 8
start_epoch = 0
end_epoch = 70
init_lr = 1.25e-4

local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device('cuda',local_rank)

def main():
    model = get_pose_net()
    model.to(device)
    model_val = model
    if torch.cuda.device_count() > 1:
            print("let's use GPU{}!!!".format(local_rank))
            model = DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank,find_unused_parameters=True)
    train_data = My_certernet_datasets(mode = 'train',datasets = datasets)
    print('there are {} train images'.format(len(train_data)))
    train_data_loader = DataLoader(dataset=train_data,
                                   num_workers=16,
                                   batch_size=batch_size, 
                                   sampler=DistributedSampler(train_data)
                                   )
    criterion = CtdetLoss()
    optimizer = torch.optim.Adam(model.parameters(), init_lr)
    for epoch in range(start_epoch+1,end_epoch+1):
        DistributedSampler(train_data).set_epoch(epoch)
        adjust_lr(optimizer,epoch,init_lr)
        train(train_data_loader,model,criterion,optimizer,device,epoch,end_epoch)
        if epoch % 10 == 0 and local_rank==0:
            save_model(save_dir + '/model_last_{}.pth'.format(epoch),epoch,model,optimizer = None)

def train(train_data_loader,model,criterion,optimizer,device,epoch,end_epoch):
    losses = AverageMeter()
    model.train()
    for i ,batch in enumerate(train_data_loader):
        start_time = time.time()
        for k in batch:
            if k != 'meta':
                batch[k] = batch[k].to(device)
        output = model(batch['input'])
        loss_stats = criterion(output,batch)
        loss = loss_stats['loss']
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        end_time = time.time()
        ELA_time = (end_time - start_time)*(end_epoch - epoch)*len(train_data_loader)
        ELA_time = time.strftime('%H:%M:%S',time.gmtime(ELA_time))
        losses.update(loss.item())
        if local_rank==0:
            print('[epoch:{},{}/{}]'.format(epoch,i,len(train_data_loader)),'current_loss:%.4f'% losses.current,\
            'average_loss:%.4f' % losses.avg,'ELA_time:',ELA_time)

def adjust_lr(optimizer,epoch,init_lr,lr_step=[45,60]):
    if epoch in lr_step:
        lr = init_lr*(0.1**(lr_step.index(epoch) + 1))
        print('Drop LR to',lr)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

def save_model(path,epoch,model,optimizer = None):
    if isinstance(model,torch.nn.DataParallel):
        state_dict = model.module.state_dict()
    else:
        state_dict = model.state_dict()
    data = {'epoch':epoch,'state_dict':state_dict}
    if not (optimizer is None):
        data['optimizer'] = optimizer.state_dict()
    torch.save(data,path)

if __name__ == '__main__':
    main()

Someone can help me? This has been bothering me for a few days. Thank you very much!!!

Gradients are averaged (divided by the number of processes) when using nn.DistributedDataParallel. This is not the case when using nn.DataParallel. You can either multiply them after the call to backward to make them equivalent to the output of nn.DataParallel.

Thank you for your reply.
I am a bit confused. I think the loss is independent of the number of batchsizes, and the batchsize increases, making the gradient more robust.
In my case, for nn.DataParallel, loss is divided by 32(the total batchsize of 4 gpus is 32); for nn.DistributedDataParallel, the loss of single process is divided by 8(per gpu batchsize is 8), at this time, their gradients are the same, even if the average of the gradient is calculated later(divided by number of precesses), the gradient is almost the same as the former.

1 Like

What’s in your CtdetLoss definition?

@Niu Do you remember what the issue was? Thanks.