Use Distributed Data Parallel correctly

I am trying to run distributed data-parallel on a single node with 3 GPUs to maximise GPU utility which is currently very low.
After following multiple tutorials, the following is my code(I have tried to add a minimal example, let me know if anything is not clear and I’ll add more) but it is exiting without doing anything on running -
#: before any statement represents minimal code I have provided

#All the required imports
#setting of environment variables

def train(world_size, args):
        #setting of all the seeds for deterministic behaviour

         rank = args.nr * args.gpus + args.gpu
         dist.init_process_group(backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)
         gpu = args.world_size


         #data_transform = 
        # root_dir = 
         #image_dataset = dataset.ImageFolder(train_dir)
         
         train_loader = torch.utils.data.DataLoader(image_dataset['train'], batch_size=batch_size,shuffle = True,num_workers = 0,sampler = train_sampler)
         valid_loader = torch.utils.data.DataLoader(image_dataset['valid'], batch_size=batch_size,shuffle = True,num_workers = 0,sampler= train_sampler)
         model_transfer = models.resnet18(pretrained=True)
         criterion = nn.CrossEntropyLoss().cuda(gpu)
         optimizer = optim.SGD(model_transfer.fc.parameters(),lr = 0.001)
         ddp_model_transfer = DDP(model_transfer, device_ids = [rank])


          for epoch in range(1, args.epochs+1):
               for batch_idx, (data, target) in enumerate(data_transfer['train']):
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            output = ddp_model_transfer(data)
            loss = criterion(output,target)
            loss.backward()
            optimizer.step()
            
            train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.data - train_loss))
##Vadlidation
        model.eval()
        for batch_idx, (data, target) in enumerate(data_transfer['valid']):
            # move to GPU
            if use_cuda:
                data, target = data.cuda(), target.cuda()
            ## update the average validation loss
            output = ddp_model_transfer(data).to(rank)
            loss = criterion(output,target)
            valid_loss +=  valid_loss + ((1 / (batch_idx + 1)) * (loss.data - valid_loss))
            pred = output.data.max(1, keepdim=True)[1]
            correct += np.sum(np.squeeze(pred.eq(target.data.view_as(pred))).cpu().numpy())
            total += data.size(0)

            if valid_loss <= valid_loss_min:
                  print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
                  valid_loss_min,valid_loss))
                  torch.save(ddp_model_transfer.state_dict(), 'case_1_model.pt')
                  valid_loss_min = valid_loss

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', default=2, type=int,metavar = 'N', help='number of total epochs to run')
    parser.add_argument('-g', '--gpus', default=1, type=int,help='number of gpus per node')
    parser.add_argument('-nr', '--nr', default=0, type=int,help='ranking within the nodes')
    args = parser.parse_args()
    world_size = 3
    mp.spawn(train,args = (world_size,args),nprocs = world_size,join=True)

if __name__=="main":
    main()





Hey @bing

I see a few errors in the code snippet:

  1. It is if __name__=="__main__" instead of if __name__=="main". I think this is also the reason why you see no output, as nothing is executed.
  2. mp.spawn will automatically provide the rank as the first argument to the target function, so the train function needs to take one additional argument before world_size. Please check the API doc here: https://pytorch.org/docs/stable/multiprocessing.html#torch.multiprocessing.spawn
  3. Don’t need to move the loss function to gpu as it does not contain any parameters.
    gpu = args.world_size
    ...
    criterion = nn.CrossEntropyLoss().cuda(gpu)
    
  4. You need to use rank instead of the current device, otherwise all processes will use the same device if you didn’t set CUDA_VISIBLE_DEVICE or run torch.cuda.set_device(rank)
    if use_cuda:
                 data, target = data.cuda(), target.cuda()
    

I tried to fix all above and run your code, but then realized I don’t have the dataset. It would be helpful to have a minimum repro. Or is it hard to create a min repro, I would suggest to expand this example to create your own application.

2 Likes

Hello Mr. Shen,
Thanks a lot for replying.
I added all the suggested modifications and now, I am getting -
ValueError: Error initializing torch.distributed using env:// rendezvous: environment variable MASTER_ADDR expected, but not set
Full traceback -

Traceback (most recent call last):

 File "Distributed_code1.py", line 212, in <module>
    main()
  File "Distributed_code1.py", line 209, in main
    mp.spawn(train,args = (world_size,),nprocs = world_size,join=True)
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 200, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 158, in start_processes
    while not context.join():
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 119, in join
    raise Exception(msg)
Exception: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 20, in _wrap
    fn(i, *args)
  File "/workdir/script/Distributed_code1.py", line 31, in train
    dist.init_process_group(backend='nccl',init_method = 'env://', world_size=world_size, rank=rank)
  File "/opt/conda/lib/python3.6/site-packages/torch/distributed/distributed_c10d.py", line 393, in init_process_group
    store, rank, world_size = next(rendezvous_iterator)
  File "/opt/conda/lib/python3.6/site-packages/torch/distributed/rendezvous.py", line 159, in _env_rendezvous_handler
    raise _env_error("MASTER_ADDR")
ValueError: Error initializing torch.distributed using env:// rendezvous: environment variable MASTER_ADDR expected, but not set

I also tried running the code python -m torch.distributed.launch mycode.py as suggested here but it is not working.

Hey Mr. Bing,

If you use env:// as the init_method, it would look for MASTER_ADDR and MASTER_PORT env vars. You can use sth like init_method='tcp://ip:port' or directly set the env var. E.g.:

    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
1 Like

Thank You.

I just added the suggested modification. Now, I am getting -
RuntimeError: NCCL error in: ../torch/lib/c10d/ProcessGroupNCCL.cpp:514, invalid usage, NCCL version 2.6.3
Full traceback -


Traceback (most recent call last):
  File "Distributed_code1.py", line 217, in <module>
    main()
  File "Distributed_code1.py", line 214, in main
    mp.spawn(train,args = (world_size,),nprocs = world_size,join=True)
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 200, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 158, in start_processes
    while not context.join():
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 119, in join
    raise Exception(msg)
Exception: 

-- Process 1 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 20, in _wrap
    fn(i, *args)
  File "/workdir/script/Distributed_code1.py", line 110, in train
    ddp_model_transfer = DDP(model_transfer, device_ids = [device])
  File "/opt/conda/lib/python3.6/site-packages/torch/nn/parallel/distributed.py", line 303, in __init__
    self.broadcast_bucket_size)
  File "/opt/conda/lib/python3.6/site-packages/torch/nn/parallel/distributed.py", line 485, in _distributed_broadcast_coalesced
    dist._broadcast_coalesced(self.process_group, tensors, buffer_size)
RuntimeError: NCCL error in: ../torch/lib/c10d/ProcessGroupNCCL.cpp:514, invalid usage, NCCL version 2.6.3

I also did a change in how the model is loading in the following way -

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_transfer = model_transfer.to(device)
ddp_model_transfer = DDP(model_transfer, device_ids = [device])

Are you using system NCCL? As of today, the master is still on NCCL 2.4.8 IIUC. Let’s first get DDP run correctly and then deal with system NCCL. Can you try changing backend='nccl' to backend='gloo'?

Are you using system NCCL?

How to figure that out?

I changed to “gloo” and now I am getting -

AssertionError: DistributedDataParallel device_ids and output_device arguments only work with single-device CUDA modules, but got device_ids [device(type='cuda', index=0)], output_device None, and module parameters {device(type='cuda', index=0), device(type='cpu')}.

The error message means the model you passed to DDP ctor lives on both CPU and GPU. Is this intentional?

module parameters {device(type=‘cuda’, index=0), device(type=‘cpu’)}

No,
Below is my model loading process -

    model_transfer = models.resnet18(pretrained=True)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    for name,param in model_transfer.named_parameters():
        if ("bn" not in name):
            param.requires_grad = False
    print(model_transfer.fc.in_features)
    n_inputs = model_transfer.fc.in_features
    model_transfer.fc = nn.Linear(n_inputs,3)
    ddp_model_transfer = DDP(model_transfer, device_ids = [device])

If you replace

ddp_model_transfer = DDP(model_transfer, device_ids = [device])

with

ddp_model_transfer = DDP(model_transfer.to(device), device_ids = [device])

then, this error would disappear I think.

Honestly, I would recommend starting from a simpler version and gradually adding complexities to it. We could try:

  1. See if this example works in your env (after setting MASTER_ADDR and MASTER_PORT).
  2. Replace the model in that example with a resnet 18 pretrained model
  3. Adding data loaders
1 Like

Honestly, I would recommend starting from a simpler version and gradually adding complexities to it.

Okay, I just did the above using the example you provided. It is working now, even when I use backend='nccl' too.
But here is an issue, how to merge the metrics provided by different GPUs together. Now, I am getting 3 outputs like below -



Epoch: 1 	Training Loss: 1.044399 	Validation Loss: 21.517847

Test Accuracy: 58% (106/180)
Validation loss decreased (inf --> 21.517847).  Saving model ...
Epoch: 1 	Training Loss: 1.036141 	Validation Loss: 23.852943

Test Accuracy: 47% (86/180)
Epoch: 1 	Training Loss: 1.040907 	Validation Loss: 21.792797

Test Accuracy: 56% (101/180)
Validation loss decreased (inf --> 23.852943).  Saving model ...
Validation loss decreased (inf --> 21.792797).  Saving model ...
Epoch: 2 	Training Loss: 0.971241 	Validation Loss: 19.095814

Test Accuracy: 62% (113/180)
Validation loss decreased (inf --> 19.095814).  Saving model ...

wandb: Waiting for W&B process to finish, PID 30011
Epoch: 2 	Training Loss: 0.965157 	Validation Loss: 20.114439

Test Accuracy: 55% (99/180)
Epoch: 2 	Training Loss: 0.966274 	Validation Loss: 21.474331

Test Accuracy: 56% (101/180)
Validation loss decreased (inf --> 20.114439).  Saving model ...
Validation loss decreased (inf --> 21.474331).  Saving model ...
1 Like

Great!

For merging logs, I see a few options. There might be more.

  1. Let one rank do the printing (e.g., if rank == 0: ...)
  2. If you need logs from all ranks, you can redirect then to write to different files.
  3. Convert strings to tensors, and then use all_gather/gather to collect them (https://pytorch.org/docs/stable/distributed.html#torch.distributed.gather). This might be a little more complex than it sounds. This can serve as an example.
2 Likes

@bing it might be a good idea to try a framework. Lightning, Ignite, or Catalyst should take care of most of the boilerplate and get the DDP details right (not always easy).

Not following my own advice, I have iterated my own train script from the original PyTorch examples over various projects and experiments. It’s a bit sprawling, but fast and easy to hack once you are familiar. It supports both torch native and Nvidia Apex DDP. I haven’t done any multi-node work with it, but lots of multi-gpu single node. I know quite a few researchers and engineers have used it for training large nets on 4-8 GPU V100 systems. A version of it tailored for ImageNet w/ RandAugment, Mixup, Random Erasing here: https://github.com/rwightman/pytorch-image-models/blob/master/train.py

An example of hacking it for object detection training with MSCOCO: https://github.com/rwightman/efficientdet-pytorch/blob/master/train.py

Those scripts have no problem maxing out GPU utilization, so long as the drives for the dataset are fast enough, the data/network is large enough, and the CPU has enough cores for your augmentation/data decoding. I use a shell script for launching the distributed training mode of the train script to keep it simple.

3 Likes

Thank You. I am working on it and will get back to you, if it doesn’t work out.

1 Like

Thank You, I’ll try to implement and will get back , if I face any problem.

Not getting whats wrong here

Traceback (most recent call last):
  File "train.py", line 14, in <module>
    cli_main()
  File "/data/fairseq/fairseq_cli/train.py", line 343, in cli_main
    distributed_utils.call_main(args, main)
  File "/data/fairseq/fairseq/distributed_utils.py", line 172, in call_main
    args.distributed_world_size,
  File "/data/wav/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 200, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/data/wav/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 158, in start_processes
    while not context.join():
  File "/data/wav/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 119, in join
    raise Exception(msg)
Exception:

-- Process 2 terminated with the following error:
Traceback (most recent call last):
  File "/data/wav/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 20, in _wrap
    fn(i, *args)
  File "/data/fairseq/fairseq/distributed_utils.py", line 154, in distributed_main
    main(args, **kwargs)
  File "/data/fairseq/fairseq_cli/train.py", line 65, in main
    task.load_dataset(valid_sub_split, combine=False, epoch=1)
  File "/data/fairseq/fairseq/tasks/audio_pretraining.py", line 110, in load_dataset
    for line in f:
  File "/data/wav/lib/python3.6/encodings/ascii.py", line 26, in decode
    return codecs.ascii_decode(input, self.errors)[0]
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position 4924: ordinal not in range(128)

train.tsv :


/data/fairseq/IE/
1143-GR/M1IE041143GENC0005.wav	21440
1143-GR/M1IE041143DGSQ1143.wav	69120
1143-GR/M1IE041143GENC0004.wav	17920

Any thoughts?