Getting Error with Distributed Data parallel

Hi @ptrblck.

I’ve gotten a script with generated data. But I can’t seem to reproduce the error above. Now, I get a
‘’’
RuntimeError: Address already in use
‘’’

I looked at:

The world_size and rank are not hard coded as asked in that one.

I checked this out Multiprocessing failed with Torch.distributed.launch module - #5 by leo-mao which stated not to set world_size and rank but when I did that, I got an error that rank or world_size argument was not found.

So here are the changes I made with reproducible data:

import torch

from   torch.autograd import Variable

import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

import torchvision
from torchvision import datasets, transforms, models, utils # add models to the list
from torchvision.utils import make_grid
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.multiprocessing as mp
import argparse

import os
from skimage import io, transform
from PIL import Image
#from img2vec_pytorch import Img2Vec
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler
import glob
import pandas as pd
import numpy as np


# ignore harmless warnings
import warnings
warnings.filterwarnings("ignore")
import shutil
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
import itertools
#from resources.plotcm import plot_confusion_matrix
import time
from pathlib import Path
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from datetime import datetime

torch.cuda.device_count()

train_transform = transforms.Compose([
        transforms.RandomRotation(10),      # rotate +/- 10 degrees
        transforms.RandomHorizontalFlip(),  # reverse 50% of images
        transforms.Resize(224),             # resize shortest side to 224 pixels
        transforms.CenterCrop(224),         # crop longest side to 224 pixels at center
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])

test_transform = transforms.Compose([
        transforms.Resize(224),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])

rand_image_tensor = torch.rand(45000, 3, 224, 224)
rand_label_tensor = torch.rand(45000, 1, 1)
rand_numeric_tensor = torch.rand(45000,1, 8)
rand_categorical_tensor = torch.rand(45000, 1, 9)
cat_embedding = [(5, 3), (21, 11), (3, 2), (47, 24), (4, 2), (8, 4), (4, 2), (20, 10), (10, 5)]

print(rand_image_tensor.size(),
     rand_label_tensor.size(),
     rand_numeric_tensor.size(),
     rand_categorical_tensor.size())

class image_Dataset():
    '''
    image class data set   
    
    '''
    def __init__(self, transform = None):
        '''
        Args:
        ------------------------------------------------------------
            data = dataframe
            image = column in dataframe with absolute path to the image
            label = column in dataframe that is the target classification variable
            numerical_columns =  numerical columns from data
            categorical_columns = categorical columns from data
            policy = ID variable
            
        '''
#         self.image_frame = data
#         self.transform = transform
        
    def __len__(self):
        return len(rand_label_tensor)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
         
        label = self.rand_label_tensor
        pic = self.rand_image_tensor
        img = pic
        numerical_data = self.rand_numeric_tensor

        if self.transform:
            image = self.transform(img)
        categorical_data = self.rand_categorical_tensor
 
        return image, label,numerical_data, categorical_data

data_train_loader = image_Dataset(transform = train_transform)

train_loader = DataLoader(data_train_loader, batch_size=10, shuffle = True)

def average_gradients(model):
    """ Gradient averaging"""
    size = float(dist.get_world_size())

    for param in model.parameters():
        dist.all_reduce_multigpu(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size

data_train_loader = image_Dataset(transform = train_transform)

train_loader = DataLoader(data_train_loader, batch_size=10, shuffle = True)


class Image_Embedd(nn.Module):

    def __init__(self, embedding_size):
        '''
        Args
        ---------------------------
        embedding_size: Contains the embedding size for the categorical columns
        num_numerical_cols: Stores the total number of numerical columns
        output_size: The size of the output layer or the number of possible outputs.
        layers: List which contains number of neurons for all the layers.
        p: Dropout with the default value of 0.5
        
        '''
        super().__init__()    
        
        self.all_embeddings = nn.ModuleList([nn.Embedding(ni, nf) for ni, nf in embedding_size])
        self.embedding_dropout = nn.Dropout(p = .04)
        
        self.cnn = models.resnet50(pretrained=True)
        
        self.cnn.fc = nn.Linear(self.cnn.fc.in_features, 1000)
        self.fc1 = nn.Linear(1000, 1071)
        self.fc2 = nn.Linear(1071, 128)
        self.fc3 = nn.Linear(128, 2)
        
        
    #define the foward method
    def forward(self, image, x_numerical, x_categorical):
        
        embeddings = []
        for i, e in enumerate(self.all_embeddings):
            embeddings.append(e(x_categorical[:,i]))
            
        x = torch.cat(embeddings, 1)
        x = self.embedding_dropout(x)
        x1 = self.cnn(image)
        x2 = x_numerical
        
        x3 = torch.cat((x1, x2), dim = 1)
        x4 = torch.cat((x, x3), dim = 1)
        x4 = F.relu(self.fc2(x4))
        x4 = self.fc3(x4)
        x4 = F.log_softmax(x4)
        return x4

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default = 1, type = int, metavar='N',
                       help = 'number of data loading workers (default = 4)')
    parser.add_argument('-g', '--gpus', default = 1, type = int,
                        help = 'number of gpus per node')
    parser.add_argument('-nr', '--nr', default = 0, type = int,
                        help = 'ranking within the nodes')
    parser.add_argument('--epochs', default = 2, type=int, metavar='N',
                       help = 'number of total epochs to run')
    args = parser.parse_args()
    args.world_size = args.gpus*args.nodes
    os.environ['MASTER_ADDR'] =  'localhost'   #'localhost'
    os.environ['MASTER_PORT'] = '8888'
    mp.spawn(train, nprocs=args.gpus, args=(args,))

def train(gpu, args):
    epochs = 500
    rank = args.nr*args.gpus + gpu

    dist.init_process_group(backend ='nccl', init_method='env://', world_size = args.world_size, rank = rank)
    torch.manual_seed(0)
    model = Image_Embedd(embedding_size=train_categorical_embedding_sizes)
    model.cuda(gpu)
#     torch.cuda.set_device(gpu)
    max_trn_batch = 11053
    criterion = torch.nn.NLLLoss().cuda(gpu)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    #Wrap the model
    model = nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank,])
    
    
    model.cuda(gpu)
    
    
    start = datetime.now()
    total_step = len(train_loader)
    
    
    
    for i in range(epochs):
        for b, (image, label, numerical_data, categorical_data) in enumerate(train_loader):
            image = image.cuda(non_blocking=True)
            label = label.cuda(non_blocking=True)
            numerical_data = numerical_data.cuda(non_blocking=True)
            categorical_data = categorical_data.cuda(non_blocking=True)

            optimizer.zero_grad()

            #count batches
            b += 1

            #throttle teh batches
            if b == max_trn_batch:
                break

            y_pred = model(image, numerical_data, categorical_data)
            single_loss = criterion(y_pred, label)

            # statistics
            print("working")


            single_loss.backward()
            optimizer.step()

            if (b+1) % 100 == 0 and gpu == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {.4f}'.format(
                epoch + 1,
                args.epochs,
                    b + 1,
                    total_step,
                    loss.item())
                     )
        
    
#     aggregated_losses.append(single_loss.cpu().data.numpy())
#     scheduler.step(single_loss)


    if gpu == 0:
        print("Training complete in: " + str(datetime.now() - start))
        
if __name__=='__main__':
    main()