Runtime connect() timed out when using multiple node with distributedDataParallel

I am having problem running training on Multiple GPUs on multiple node using DistributedDataParallel. I get RuntimeError: connect() timed out on Node 2. The code works fine when I am using just one Node and multiple GPUs on that Node. I am running my code in the docker image. I have pasted my code below and also the steps I use to run the training.

batch_loader.py:

from torch.utils import data
import random
import os
import numpy as np
import torch
    
class TrainFolder(data.Dataset):
    def __init__(self, file):
        super(TrainFolder, self).__init__()

        self.images = []
        fid = file
        for x in fid:
            if x == '':
                continue
            labelfile = x.replace("nonPR", "PR")
            info = (x, labelfile)
            self.images.append(info)

        random.shuffle(self.images)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        image_file, label_file = self.images[index]
        img = np.load(image_file)
        lab = np.load(label_file)

        img = np.rollaxis(img, 2, 0)
        lab = np.rollaxis(lab, 2, 0)

        im = img.copy()
        lb = lab.copy()
        img = torch.from_numpy(im[:, :, :])
        lab = torch.from_numpy(lb[:, :, :])
        return img, lab

gan_network.py:

import math
import torch
import torch.nn as nn

def gen_initialization(m):
    if type(m) == nn.Conv2d:
        sh = m.weight.shape
        nn.init.normal_(m.weight, std=math.sqrt(2.0 / (sh[0]*sh[2]*sh[3])))
        nn.init.constant_(m.bias, 0)
    elif type(m) == nn.BatchNorm2d:
        nn.init.normal_(m.weight)
        nn.init.normal_(m.bias)

class TripleConv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(TripleConv, self).__init__()
        mid_ch = (in_ch + out_ch) // 2
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, mid_ch, kernel_size=3, stride=1, padding=1, bias=True),
            nn.ReLU(),
            nn.Conv2d(mid_ch, mid_ch, kernel_size=3, stride=1, padding=1, bias=True),
            nn.ReLU(),
            nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True),
            nn.ReLU()
        )
        self.conv.apply(gen_initialization)

    def forward(self, x):
        return self.conv(x)


class Down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(Down, self).__init__()
        self.triple_conv = TripleConv(in_ch, out_ch)
        self.avg_pool_conv = nn.AvgPool2d(2, 2)
        self.in_ch = in_ch
        self.out_ch = out_ch

    def forward(self, x):
        self.cache = self.triple_conv(x)
        pad = torch.zeros(x.shape[0], self.out_ch - self.in_ch, x.shape[2], x.shape[3], device=x.device)
        x = torch.cat((x, pad), dim=1)
        self.cache += x
        return self.avg_pool_conv(self.cache)


class Center(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(Center, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True),
            nn.ReLU()
        )
        self.conv.apply(gen_initialization)

    def forward(self, x):
        return self.conv(x)


class Up(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(Up, self).__init__()
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear',
                                    align_corners=True)
        self.triple_conv = TripleConv(in_ch, out_ch)

    def forward(self, x, cache):
        x = self.upsample(x)
        x = torch.cat((x, cache), dim=1)
        x = self.triple_conv(x)
        return x


class UNet(nn.Module):
    def __init__(self, in_ch, first_ch=None):
        super(UNet, self).__init__()

        if not first_ch:
            first_ch = 32

        self.down1 = Down(in_ch, first_ch)
        self.down2 = Down(first_ch, first_ch*2)
        self.down3 = Down(first_ch*2, first_ch*4)
        self.down4 = Down(first_ch*4, first_ch*8)
        self.center = Center(first_ch*8, first_ch*8)
        self.up4 = Up(first_ch*8*2, first_ch*4)
        self.up3 = Up(first_ch*4*2, first_ch*2)
        self.up2 = Up(first_ch*2*2, first_ch)
        self.up1 = Up(first_ch*2, first_ch)
        self.output = nn.Conv2d(first_ch, in_ch, kernel_size=3, stride=1,
                                padding=1, bias=True)
        self.output.apply(gen_initialization)

    def forward(self, x):

        x = self.down1(x)
        x = self.down2(x)
        x = self.down3(x)
        x = self.down4(x)
        x = self.center(x)
        x = self.up4(x, self.down4.cache)
        x = self.up3(x, self.down3.cache)
        x = self.up2(x, self.down2.cache)
        x = self.up1(x, self.down1.cache)
        x = self.output(x)
        
        return x

pr_train_mp.py:

from configobj import ConfigObj
from tqdm import tqdm
import os
import gan_network
import glob
import random
import torch
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
from tensorboardX import SummaryWriter
from batch_loader import TrainFolder
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
import argparse
import torch.distributed as dist

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" 
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

def init_parameters():
    tc, vc = ConfigObj(), ConfigObj()
    tc.batch_size, vc.batch_size = 1, 1
    tc.n_channels, vc.n_channels = 6, 6
    tc.image_size, vc.image_size = 1024, 1024
    return tc, vc

def train(gpu, args):
    ############################################################
    rank = args.nr * args.gpus + gpu	                          
    dist.init_process_group(                                   
    	backend='nccl',                                         
   		init_method='env://',                                   
    	world_size=args.world_size,                              
    	rank=rank                                               
    )                                            
    ############################################################
    torch.manual_seed(47)
    torch.backends.cudnn.benchmark = True
    netG = gan_network.UNet(6, first_ch=32)
    torch.cuda.set_device(gpu)
    netG.cuda(gpu)

    optimizerG = optim.Adam(netG.parameters(), lr=1e-4, betas=(0.9, 0.999))

    # Initialize BCELoss function
    criterion = nn.MSELoss().cuda(gpu)

    ###############################################################
    # Wrap the model
    netG = nn.parallel.DistributedDataParallel(netG, device_ids=[gpu])
    ###############################################################
    
    # Data loading code
    train_samples = glob.glob('/home/data/nas/Processed_Data/training_data/phase_recovery/110920/npyfiles/size_1024/train/*_nonPR.npy')
    valid_samples = glob.glob('/home/data/nas/Processed_Data/training_data/phase_recovery/110920/npyfiles/size_1024/valid/*_nonPR.npy')

    random.shuffle(train_samples)

    trainData = TrainFolder(train_samples)
    validData = TrainFolder(valid_samples)

    ################################################################
    train_sampler = torch.utils.data.distributed.DistributedSampler(
    	trainData, 	num_replicas=args.world_size, rank=rank)

    valid_sampler = torch.utils.data.distributed.DistributedSampler(
    	validData,	num_replicas=args.world_size, rank=rank)
    ################################################################
    train_config, valid_config = init_parameters()
    train_data_loader = DataLoader(dataset=trainData, num_workers=0, batch_size=train_config.batch_size, 
                                    drop_last=False, pin_memory=True, sampler=train_sampler)
    valid_data_loader = DataLoader(dataset=validData, num_workers=0, batch_size=valid_config.batch_size,
                                    drop_last=False, pin_memory=True, sampler=valid_sampler)

    niter = 10000
    for epoch in range(niter):
        train_sampler.set_epoch(epoch)
        valid_sampler.set_epoch(epoch)
        netG.train()
        for i, (images, labels) in enumerate(tqdm(train_data_loader)):
            images = images.to(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            images = images.float()
            labels = labels.float()

            netG.zero_grad()
            optimizerG.zero_grad()
            
            output = netG(images)
            errG_mse = criterion(output, labels)
            errG_mse.backward()
            
            optimizerG.step()

        netG.eval()
        with torch.no_grad():
            for i, (images, labels) in enumerate(tqdm(valid_data_loader)):
                images = images.cuda(non_blocking=True)
                labels = labels.cuda(non_blocking=True)
                images = images.float()
                labels = labels.float()

                G_output = netG(images)
                valid_errG_mse = criterion(G_output, labels)
        
        if epoch % 3 == 0 and gpu == 0:
            torch.save(netG.state_dict(), f'model/network_epoch{epoch}.pth')

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-n', '--nodes', default=1, type=int, metavar='N')
    parser.add_argument('-g', '--gpus', default=1, type=int,
                        help='number of gpus per node')
    parser.add_argument('-nr', '--nr', default=0, type=int,
                        help='ranking within the nodes')
    args = parser.parse_args()
    #########################################################
    args.world_size = args.gpus * args.nodes                #
    os.environ['MASTER_ADDR'] = 'a.b.c.d'             #
    os.environ['MASTER_PORT'] = '110'                       #
    mp.spawn(train, nprocs=args.gpus, args=(args,))         #
    #########################################################


if __name__ == '__main__':
    main()

Steps followed to train:

  1. Launch my docker image on Node 1 and Node 2 using this command: (docker image name is aqusens-train
    sudo docker run --network=host -it --shm-size=32G --rm --runtime=nvidia -v /home:/home aqusens-train

  2. On node 1 run the following command:
    python3 pr_train_mp.py -n 2 -g 1 -nr 0

  3. On node 2 run the following command:
    python3 pr_train_mp.py -n 2 -g 1 -nr 1

Node 1 is the root machine. I can run the code individually on each node.

Input Image SIze: (NCHW) = (1,6,1024,1024)

Node 1 Environment::
Pytorch: 1.7.1 Ubuntu: ‘18.04’ cudnn: 7605 cuda: 10.2

Node 2 Environment::
Pytorch: 1.7.0 Ubuntu: ‘18.04’ cudnn: 7605 cuda: 10.2

Just to make sure on how you initialize process group, do you set
MASTER_ADDR to be address of your node 1 on both nodes?
(so node 2 needs to know address of node 1 before you launch it)

https://pytorch.org/docs/stable/distributed.html#environment-variable-initialization

I have copied pr_train_mp.py file on both the nodes. MASTER_ADDR is same ip address on both nodes. Only -nr argument is different on both nodes.

ok, your MASTER_ADDR points to node 1.

could you check if node 2 can talk to node 1? and it can reach your specified port 110 (e.g. check for firewalls)?

I tried this method. I am running the commands on terminal window of node 2

$ ip route get 192.168.1.183 gave output
192.168.1.183 dev enp6s0 src 192.168.1.237 uid 1004
cache

$ telnet 192.168.1.183 110
Trying 192.168.1.183…
telnet: Unable to connect to remote host: Connection timed out

This means node 2 cannot connect to node 1. Is there a way to fix this ?

I think this should be configured in your cluster settings (e.g. AWS) and also docker containers should be given access to proper NIC interfaces.

I am not a docker expert, maybe check something like this

This turned out to be a firewall issue. I had to allow to and from rules of the node 2 in the firewall settings of the node 1 (master node).

sudo ufw allow from ..*.0/24

sudo ufw allow to ..*.0/24