How to set environment variables in torch.nn.parallel.DistributedDataParallel?

Hi, I am trying to train my code with distributed data parallelism, I already trained using torch.nn.DataParallel and now I am trying to see how much gain I can get in training speed if I train using torch.nn.parallel.DistributedDataParallel since I read on numerous pages that its better to use DistributedDataParallel. So I followed one of the examples but I am not sure how to set the following environment variables (os.environ['MASTER_ADDR'] and os.environ['MASTER_PORT']) since I am using a cloud service so I am not sure which specific node my code gets allocated for training my model. Can anyone help me to set these variables?

MASTER_ADDR needs to be a routable IP for all processes in the group. If the question is how to get that IP and share it with other processes, the cloud service provider might know better? I would imagine this is common problem that would occur when configuring many distributed systems (e.g., IP of the master node in a Hadoop cluster).

If the hostname is pre-known, I guess all processes can try to resolve that hostname and pass the resolved IP to MASTER_ADDR?

BTW, could you please add a “distributed” tag to distributed-training related questions, so that people working on it can get back to you promptly.

Right.
I tried training the model using launch utility but I got no output. My training program (train.py) looks like:

def runTraining(i,args):
    torch.cuda.set_device(args.local_rank) 
    torch.distributed.init_process_group(backend='nccl',
                                     init_method='env://')   
....
net = nn.parallel.DistributedDataParallel(net)

and the script is:
CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 ./src/train.py

  1. Since the IP was an issue, I assume you will train the model using multiple machines? If yes, the --nnode and --node_rank also need to be configured, see: https://github.com/pytorch/examples/tree/master/distributed/ddp

  2. Since each process can see two devices (CUDA_VISIBLE_DEVICES=0,1), you will need to set the device_ids to make sure that each process exclusively works on one device e.g., (DistributedDataParallel(net, device_ids=[args.local_rank])).

No, I am trying to train it on a single node with multiple GPUs. I added the device_ids so that each process can be mapped to a single GPU but again nothing happens. The program doesn’t show any error or output.

By “no output” do you mean the program hangs or does it exit without executing your print statement?

I was able to address the above issue and now facing the issue of CUDA out of memory. The program runs fine with nn.DataParallel. My guess is pytorch is assigning complete data batch (size 8) to one GPU. Below I have written the script, let me know if you see detect some mistake?

batch file: CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 train.py

training script:

def runTraining(args):
    torch.cuda.set_device(args.local_rank) 
    torch.distributed.init_process_group(backend='nccl',
                                     init_method='env://') 
.....
 train_sampler = torch.utils.data.distributed.DistributedSampler(train_set)

    train_loader = DataLoader(train_set,
                              batch_size=batch_size,
                              num_workers=args.num_workers,
                              shuffle=(train_sampler is None), sampler=train_sampler)
.....
if torch.cuda.is_available():
        device = 'cuda'
net = net.to(device)
if torch.cuda.is_available(): 
        net.cuda()
        softMax.cuda()
        CE_loss.cuda()
        Dice_loss.cuda()
    
net = nn.parallel.DistributedDataParallel(
net,device_ids[args.local_rank],find_unused_parameters=True)
.......
for i in range(epoch):
        train_sampler.set_epoch(epoch)


....
if __name__ == '__main__':
    parser=argparse.ArgumentParser()
     parser.add_argument("--local_rank", dest = 'local_rank',type=int, default=0)
     parser.add_argument('--num_workers', default = 4, type = int)
     args=parser.parse_args()
    runTraining(args)

My guess is pytorch is assigning complete data batch (size 8) to one GPU.

Yep, this is true. The DistributedSampler is a per-process concept. So if you spawn two processes, each process will construct its own DDP instance and DistributedSampler instance. Can you try reduce the the per-process batch_size to batch_size / world_size?

@mrshenli I have some questions regarding num_workers in dataloader when the model is trained in Distributed scenario:

  1. Does DistributedSampler or Distributed training in general show some specific behavior for multiple worker processes for data-loading? In other words, do I need to reduce the number of data_loading workers when using Distributed training? Does distributed training increase the data_loading workers?

  2. When we do DataParallel with num_workers>1, do the data-loading workers are specified explicitly for specific GPUs or data-loading worker processes runs separately?

I was able to run the program with reduced batch-size via nn.Distributed, I noticed the time per training step is reduced from 1s (nn.DataParallel) to 0.8s (distributed) however, the loss per epoch is twice the loss I was getting from nn.DataParallel and I am getting two losses at end of epoch (i can imagine they are coming from two processes), but doesn’t it have to be single loss? Any thoughts why the loss is twice or will it do averaging at back end? Also, any chance if the training speed can be further improved using two nodes, each with a single GPU? Currently I am using a single node with two GPUs.

Output for nn.DataParallel:
[Training] Epoch: 0, LossG: 4.1266

Output with nn.Distributed:

[Training] Epoch: 0, LossG: 9.2402                                                                           
[Training] Epoch: 0, LossG: 9.2885

The loss will be local to each process and DDP only synchronizes gradients. So there will be two losses. For more details, please see this note and this paper.

Any thoughts why the loss is twice or will it do averaging at back end?

hmm… this is weird. If you are not using DataParallel or DistributedDataParallel (just local model), how does the loss vary with batch size?

Also, any chance if the training speed can be further improved using two nodes, each with a single GPU?

With two servers, do you mean 2 GPUs per node or 2 GPUs in total? If it is the latter, probably not, as the across-machine comm can be slower than same-machine comm, especially if NVLink is available. If it is the former, it can be, but we need to first understand two things:

  1. What is the dominating delay in each training iteration? If it is the forward/backward/opt.step(), then scale out can be helpful. Otherwise, if it is the the dataloader, then using more machines won’t help.
  2. If you use smaller batch size in local training, how much does it speed up each iteration? If the speedup is significant, then using more machines can help.

I tried varying batch_sizes, (the same which each process of DDP uses), loss is half of what I get from DDP.

hmm… DDP shouldn’t have any impact in the forward pass and hence the loss should be the same. Could you please share a self-contained script that can reproduce this?

May I ask you, if you tried something like:

os.environ['MASTER_ADDR'] = 'localhost' # or '127.0.0.1'

?