Error in using DistributedDataParallel

Hi, I am trying to use DistributedDataParallel for a toy example . But I got the following error, when I passed the arguments. Here I want to have 4 nodes and 1 gpu per node.

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        
        super(ConvNet, self).__init__()
        
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return(out)

if __name__=='__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1,
                        type=int, metavar='N')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-gi', '--gpui', default=4, type=int,
                        help='the index of gpu')
    
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    parser.add_argument('--epochs', default=2, type=int, 
                        metavar='N',
                        help='number of total epochs to run')
    args = parser.parse_args()
    #########################################################
    args.world_size = args.gpus * args.nodes                #  it is equal to the total number of gpus, because we use each gpu per node 
    os.environ['MASTER_ADDR'] = '10.57.23.164'              #  it tells which IP address it should look for process 0
    os.environ['MASTER_PORT'] = '8888'                      #
    mp.spawn(train,args=(args,),nprocs=args.gpus)         #
    #########################################################


def train(i,args):
    
    ########################################
    rank =args.nr * args.gpus + args.gpui
    dist.init_process_group(backed = 'nccl',
                           init_method = 'env//',
                           world_size= args.world_size,
                           rank=rank)
    ###########################################
    
    torch.manual_seed(0)
    
    model = ConvNet()
    
    torch.cuda.set_device(args.gpui)
    
    model.to(args.gpui)
    
    batch_size = 100
    
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().to(args.gpui)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    #################################################
    
    # wrap the model
    model = nn.parallel.DistributedDataParallel(model,device_ids = [args.gpui])
    
    #####################################################################
    
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    
    ##################################################
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                    num_replicas=args.word_size,
                                                                    rank = rank)
    
    ################################################ 
    
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=False,
                                               num_workers=0,
                                               pin_memory=True,
                                               sampler = train_sampler)
    ###################################################

    start = datetime.now()
    
    total_step = len(train_loader)
    
    for epoch in range(args.epochs):
        
        for i, (images, labels) in enumerate(train_loader):
            images = images.to(non_blocking=True)
            labels = labels.to(non_blocking=True)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            if (i + 1) % 100 == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                    epoch + 1, 
                    args.epochs, 
                    i + 1, 
                    total_step,
                    loss.item())
                   )
   
   print("Training complete in: " + str(datetime.now() - start))

I passed the following arguments and I got the following error:

%run mnist_dit.py -n 4 -g 1 -gi 0 -nr 0 

error:

Exception                                 Traceback (most recent call last)
/mnt/home/pytorch_projects/mnist_dit.py in <module>
    139     os.environ['MATSER_PORT']='8890'
    140 
--> 141     mp.spawn(train, nprocs=args.gpus, args=(args,))

~/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in spawn(fn, args, nprocs, join, daemon, start_method)
    198                ' torch.multiprocessing.start_process(...)' % start_method)
    199         warnings.warn(msg)
--> 200     return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')

~/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in start_processes(fn, args, nprocs, join, daemon, start_method)
    156 
    157     # Loop on join until it returns True or raises an exception.
--> 158     while not context.join():
    159         pass
    160 

~/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in join(self, timeout)
    117         msg = "\n\n-- Process %d terminated with the following error:\n" % error_index
    118         msg += original_trace
--> 119         raise Exception(msg)
    120 
    121 

Exception: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/home/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 20, in _wrap
    fn(i, *args)
  File "/mnt/home/pytorch_projects/mnist_dit.py", line 44, in train
    dist.init_process_group(backend='nccl',init_method='env//',world_size= args.word_size,rank = rank)
  File "/home/anaconda3/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 391, in init_process_group
    init_method, rank, world_size, timeout=timeout
  File "/home/anaconda3/lib/python3.7/site-packages/torch/distributed/rendezvous.py", line 79, in rendezvous
    raise RuntimeError("No rendezvous handler for {}://".format(result.scheme))
RuntimeError: No rendezvous handler for ://

It’s env:// instead of env//. But you should be able to omit init_method argument as you have already set MASTER_ADDR and MASTER_PORT.

BTW, could you please add a “distributed” tag to distributed training related questions? so that people working on the distributed package can get back to you promptly.

1 Like

Thank you for your reply. Yes, I would do that. sorry, I didn’t know that. Just, I have a question regrading MASTER_ADDR and MASTER_PORT, how should I set them while I am working on a remote server?
Here I just used the one in the tutorial.

The IP address should be reachable for all processes in the group. Usually you can get that IP by ifconfig and then pick the NIC you would like to use.

Ok, thank you for your answer.