Hi @ptrblck.
I’ve gotten a script with generated data. But I can’t seem to reproduce the error above. Now, I get a
‘’’
RuntimeError: Address already in use
‘’’
I looked at:
The world_size and rank are not hard coded as asked in that one.
I checked this out Multiprocessing failed with Torch.distributed.launch module - #5 by leo-mao which stated not to set world_size and rank but when I did that, I got an error that rank or world_size argument
was not found.
So here are the changes I made with reproducible data:
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torchvision
from torchvision import datasets, transforms, models, utils # add models to the list
from torchvision.utils import make_grid
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.multiprocessing as mp
import argparse
import os
from skimage import io, transform
from PIL import Image
#from img2vec_pytorch import Img2Vec
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler
import glob
import pandas as pd
import numpy as np
# ignore harmless warnings
import warnings
warnings.filterwarnings("ignore")
import shutil
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
import itertools
#from resources.plotcm import plot_confusion_matrix
import time
from pathlib import Path
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from datetime import datetime
torch.cuda.device_count()
train_transform = transforms.Compose([
transforms.RandomRotation(10), # rotate +/- 10 degrees
transforms.RandomHorizontalFlip(), # reverse 50% of images
transforms.Resize(224), # resize shortest side to 224 pixels
transforms.CenterCrop(224), # crop longest side to 224 pixels at center
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
])
test_transform = transforms.Compose([
transforms.Resize(224),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
])
rand_image_tensor = torch.rand(45000, 3, 224, 224)
rand_label_tensor = torch.rand(45000, 1, 1)
rand_numeric_tensor = torch.rand(45000,1, 8)
rand_categorical_tensor = torch.rand(45000, 1, 9)
cat_embedding = [(5, 3), (21, 11), (3, 2), (47, 24), (4, 2), (8, 4), (4, 2), (20, 10), (10, 5)]
print(rand_image_tensor.size(),
rand_label_tensor.size(),
rand_numeric_tensor.size(),
rand_categorical_tensor.size())
class image_Dataset():
'''
image class data set
'''
def __init__(self, transform = None):
'''
Args:
------------------------------------------------------------
data = dataframe
image = column in dataframe with absolute path to the image
label = column in dataframe that is the target classification variable
numerical_columns = numerical columns from data
categorical_columns = categorical columns from data
policy = ID variable
'''
# self.image_frame = data
# self.transform = transform
def __len__(self):
return len(rand_label_tensor)
def __getitem__(self, idx):
if torch.is_tensor(idx):
idx = idx.tolist()
label = self.rand_label_tensor
pic = self.rand_image_tensor
img = pic
numerical_data = self.rand_numeric_tensor
if self.transform:
image = self.transform(img)
categorical_data = self.rand_categorical_tensor
return image, label,numerical_data, categorical_data
data_train_loader = image_Dataset(transform = train_transform)
train_loader = DataLoader(data_train_loader, batch_size=10, shuffle = True)
def average_gradients(model):
""" Gradient averaging"""
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce_multigpu(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
data_train_loader = image_Dataset(transform = train_transform)
train_loader = DataLoader(data_train_loader, batch_size=10, shuffle = True)
class Image_Embedd(nn.Module):
def __init__(self, embedding_size):
'''
Args
---------------------------
embedding_size: Contains the embedding size for the categorical columns
num_numerical_cols: Stores the total number of numerical columns
output_size: The size of the output layer or the number of possible outputs.
layers: List which contains number of neurons for all the layers.
p: Dropout with the default value of 0.5
'''
super().__init__()
self.all_embeddings = nn.ModuleList([nn.Embedding(ni, nf) for ni, nf in embedding_size])
self.embedding_dropout = nn.Dropout(p = .04)
self.cnn = models.resnet50(pretrained=True)
self.cnn.fc = nn.Linear(self.cnn.fc.in_features, 1000)
self.fc1 = nn.Linear(1000, 1071)
self.fc2 = nn.Linear(1071, 128)
self.fc3 = nn.Linear(128, 2)
#define the foward method
def forward(self, image, x_numerical, x_categorical):
embeddings = []
for i, e in enumerate(self.all_embeddings):
embeddings.append(e(x_categorical[:,i]))
x = torch.cat(embeddings, 1)
x = self.embedding_dropout(x)
x1 = self.cnn(image)
x2 = x_numerical
x3 = torch.cat((x1, x2), dim = 1)
x4 = torch.cat((x, x3), dim = 1)
x4 = F.relu(self.fc2(x4))
x4 = self.fc3(x4)
x4 = F.log_softmax(x4)
return x4
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default = 1, type = int, metavar='N',
help = 'number of data loading workers (default = 4)')
parser.add_argument('-g', '--gpus', default = 1, type = int,
help = 'number of gpus per node')
parser.add_argument('-nr', '--nr', default = 0, type = int,
help = 'ranking within the nodes')
parser.add_argument('--epochs', default = 2, type=int, metavar='N',
help = 'number of total epochs to run')
args = parser.parse_args()
args.world_size = args.gpus*args.nodes
os.environ['MASTER_ADDR'] = 'localhost' #'localhost'
os.environ['MASTER_PORT'] = '8888'
mp.spawn(train, nprocs=args.gpus, args=(args,))
def train(gpu, args):
epochs = 500
rank = args.nr*args.gpus + gpu
dist.init_process_group(backend ='nccl', init_method='env://', world_size = args.world_size, rank = rank)
torch.manual_seed(0)
model = Image_Embedd(embedding_size=train_categorical_embedding_sizes)
model.cuda(gpu)
# torch.cuda.set_device(gpu)
max_trn_batch = 11053
criterion = torch.nn.NLLLoss().cuda(gpu)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
#Wrap the model
model = nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank,])
model.cuda(gpu)
start = datetime.now()
total_step = len(train_loader)
for i in range(epochs):
for b, (image, label, numerical_data, categorical_data) in enumerate(train_loader):
image = image.cuda(non_blocking=True)
label = label.cuda(non_blocking=True)
numerical_data = numerical_data.cuda(non_blocking=True)
categorical_data = categorical_data.cuda(non_blocking=True)
optimizer.zero_grad()
#count batches
b += 1
#throttle teh batches
if b == max_trn_batch:
break
y_pred = model(image, numerical_data, categorical_data)
single_loss = criterion(y_pred, label)
# statistics
print("working")
single_loss.backward()
optimizer.step()
if (b+1) % 100 == 0 and gpu == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {.4f}'.format(
epoch + 1,
args.epochs,
b + 1,
total_step,
loss.item())
)
# aggregated_losses.append(single_loss.cpu().data.numpy())
# scheduler.step(single_loss)
if gpu == 0:
print("Training complete in: " + str(datetime.now() - start))
if __name__=='__main__':
main()