I am having problem running training on Multiple GPUs on multiple node using DistributedDataParallel. I get RuntimeError: connect() timed out on Node 2. The code works fine when I am using just one Node and multiple GPUs on that Node. I am running my code in the docker image. I have pasted my code below and also the steps I use to run the training.
batch_loader.py:
from torch.utils import data
import random
import os
import numpy as np
import torch
class TrainFolder(data.Dataset):
def __init__(self, file):
super(TrainFolder, self).__init__()
self.images = []
fid = file
for x in fid:
if x == '':
continue
labelfile = x.replace("nonPR", "PR")
info = (x, labelfile)
self.images.append(info)
random.shuffle(self.images)
def __len__(self):
return len(self.images)
def __getitem__(self, index):
image_file, label_file = self.images[index]
img = np.load(image_file)
lab = np.load(label_file)
img = np.rollaxis(img, 2, 0)
lab = np.rollaxis(lab, 2, 0)
im = img.copy()
lb = lab.copy()
img = torch.from_numpy(im[:, :, :])
lab = torch.from_numpy(lb[:, :, :])
return img, lab
gan_network.py:
import math
import torch
import torch.nn as nn
def gen_initialization(m):
if type(m) == nn.Conv2d:
sh = m.weight.shape
nn.init.normal_(m.weight, std=math.sqrt(2.0 / (sh[0]*sh[2]*sh[3])))
nn.init.constant_(m.bias, 0)
elif type(m) == nn.BatchNorm2d:
nn.init.normal_(m.weight)
nn.init.normal_(m.bias)
class TripleConv(nn.Module):
def __init__(self, in_ch, out_ch):
super(TripleConv, self).__init__()
mid_ch = (in_ch + out_ch) // 2
self.conv = nn.Sequential(
nn.Conv2d(in_ch, mid_ch, kernel_size=3, stride=1, padding=1, bias=True),
nn.ReLU(),
nn.Conv2d(mid_ch, mid_ch, kernel_size=3, stride=1, padding=1, bias=True),
nn.ReLU(),
nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True),
nn.ReLU()
)
self.conv.apply(gen_initialization)
def forward(self, x):
return self.conv(x)
class Down(nn.Module):
def __init__(self, in_ch, out_ch):
super(Down, self).__init__()
self.triple_conv = TripleConv(in_ch, out_ch)
self.avg_pool_conv = nn.AvgPool2d(2, 2)
self.in_ch = in_ch
self.out_ch = out_ch
def forward(self, x):
self.cache = self.triple_conv(x)
pad = torch.zeros(x.shape[0], self.out_ch - self.in_ch, x.shape[2], x.shape[3], device=x.device)
x = torch.cat((x, pad), dim=1)
self.cache += x
return self.avg_pool_conv(self.cache)
class Center(nn.Module):
def __init__(self, in_ch, out_ch):
super(Center, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True),
nn.ReLU()
)
self.conv.apply(gen_initialization)
def forward(self, x):
return self.conv(x)
class Up(nn.Module):
def __init__(self, in_ch, out_ch):
super(Up, self).__init__()
self.upsample = nn.Upsample(scale_factor=2, mode='bilinear',
align_corners=True)
self.triple_conv = TripleConv(in_ch, out_ch)
def forward(self, x, cache):
x = self.upsample(x)
x = torch.cat((x, cache), dim=1)
x = self.triple_conv(x)
return x
class UNet(nn.Module):
def __init__(self, in_ch, first_ch=None):
super(UNet, self).__init__()
if not first_ch:
first_ch = 32
self.down1 = Down(in_ch, first_ch)
self.down2 = Down(first_ch, first_ch*2)
self.down3 = Down(first_ch*2, first_ch*4)
self.down4 = Down(first_ch*4, first_ch*8)
self.center = Center(first_ch*8, first_ch*8)
self.up4 = Up(first_ch*8*2, first_ch*4)
self.up3 = Up(first_ch*4*2, first_ch*2)
self.up2 = Up(first_ch*2*2, first_ch)
self.up1 = Up(first_ch*2, first_ch)
self.output = nn.Conv2d(first_ch, in_ch, kernel_size=3, stride=1,
padding=1, bias=True)
self.output.apply(gen_initialization)
def forward(self, x):
x = self.down1(x)
x = self.down2(x)
x = self.down3(x)
x = self.down4(x)
x = self.center(x)
x = self.up4(x, self.down4.cache)
x = self.up3(x, self.down3.cache)
x = self.up2(x, self.down2.cache)
x = self.up1(x, self.down1.cache)
x = self.output(x)
return x
pr_train_mp.py:
from configobj import ConfigObj
from tqdm import tqdm
import os
import gan_network
import glob
import random
import torch
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
from tensorboardX import SummaryWriter
from batch_loader import TrainFolder
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
import argparse
import torch.distributed as dist
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
def init_parameters():
tc, vc = ConfigObj(), ConfigObj()
tc.batch_size, vc.batch_size = 1, 1
tc.n_channels, vc.n_channels = 6, 6
tc.image_size, vc.image_size = 1024, 1024
return tc, vc
def train(gpu, args):
############################################################
rank = args.nr * args.gpus + gpu
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=args.world_size,
rank=rank
)
############################################################
torch.manual_seed(47)
torch.backends.cudnn.benchmark = True
netG = gan_network.UNet(6, first_ch=32)
torch.cuda.set_device(gpu)
netG.cuda(gpu)
optimizerG = optim.Adam(netG.parameters(), lr=1e-4, betas=(0.9, 0.999))
# Initialize BCELoss function
criterion = nn.MSELoss().cuda(gpu)
###############################################################
# Wrap the model
netG = nn.parallel.DistributedDataParallel(netG, device_ids=[gpu])
###############################################################
# Data loading code
train_samples = glob.glob('/home/data/nas/Processed_Data/training_data/phase_recovery/110920/npyfiles/size_1024/train/*_nonPR.npy')
valid_samples = glob.glob('/home/data/nas/Processed_Data/training_data/phase_recovery/110920/npyfiles/size_1024/valid/*_nonPR.npy')
random.shuffle(train_samples)
trainData = TrainFolder(train_samples)
validData = TrainFolder(valid_samples)
################################################################
train_sampler = torch.utils.data.distributed.DistributedSampler(
trainData, num_replicas=args.world_size, rank=rank)
valid_sampler = torch.utils.data.distributed.DistributedSampler(
validData, num_replicas=args.world_size, rank=rank)
################################################################
train_config, valid_config = init_parameters()
train_data_loader = DataLoader(dataset=trainData, num_workers=0, batch_size=train_config.batch_size,
drop_last=False, pin_memory=True, sampler=train_sampler)
valid_data_loader = DataLoader(dataset=validData, num_workers=0, batch_size=valid_config.batch_size,
drop_last=False, pin_memory=True, sampler=valid_sampler)
niter = 10000
for epoch in range(niter):
train_sampler.set_epoch(epoch)
valid_sampler.set_epoch(epoch)
netG.train()
for i, (images, labels) in enumerate(tqdm(train_data_loader)):
images = images.to(non_blocking=True)
labels = labels.cuda(non_blocking=True)
images = images.float()
labels = labels.float()
netG.zero_grad()
optimizerG.zero_grad()
output = netG(images)
errG_mse = criterion(output, labels)
errG_mse.backward()
optimizerG.step()
netG.eval()
with torch.no_grad():
for i, (images, labels) in enumerate(tqdm(valid_data_loader)):
images = images.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
images = images.float()
labels = labels.float()
G_output = netG(images)
valid_errG_mse = criterion(G_output, labels)
if epoch % 3 == 0 and gpu == 0:
torch.save(netG.state_dict(), f'model/network_epoch{epoch}.pth')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
args = parser.parse_args()
#########################################################
args.world_size = args.gpus * args.nodes #
os.environ['MASTER_ADDR'] = 'a.b.c.d' #
os.environ['MASTER_PORT'] = '110' #
mp.spawn(train, nprocs=args.gpus, args=(args,)) #
#########################################################
if __name__ == '__main__':
main()
Steps followed to train:
-
Launch my docker image on Node 1 and Node 2 using this command: (docker image name is aqusens-train
sudo docker run --network=host -it --shm-size=32G --rm --runtime=nvidia -v /home:/home aqusens-train
-
On node 1 run the following command:
python3 pr_train_mp.py -n 2 -g 1 -nr 0
-
On node 2 run the following command:
python3 pr_train_mp.py -n 2 -g 1 -nr 1
Node 1 is the root machine. I can run the code individually on each node.
Input Image SIze: (NCHW) = (1,6,1024,1024)
Node 1 Environment::
Pytorch: 1.7.1 Ubuntu: ‘18.04’ cudnn: 7605 cuda: 10.2
Node 2 Environment::
Pytorch: 1.7.0 Ubuntu: ‘18.04’ cudnn: 7605 cuda: 10.2