Multi-GPU + Multi-Node DistributedDataParallel hangs

Hi

I’m experiencing an issue where distributed models using torch.distributed.launch and distributeddataparallel hang specifically for NCCL Multi-GPU Multi-Node training, but work fine for Single-GPU Multi-Node and Multi-Node, Single-GPU training, and was wondering if anyone else had experienced such an issue?

In the specific case of Multi-GPU Multi-Node, all GPU’s are loaded with models (as in, nvidia-smi reports GPU memory utilisation), but at reaching distributeddataparallel NCCL_DEBUG reports

“SECONDARY_ADDR:6582:6894 [1] NCCL INFO Call to connect returned Connection timed out, retrying
SECONDARY_ADDR:6581:6895 [0] NCCL INFO Call to connect returned Connection timed out, retrying” on the rank 1 variant running on device SECONDARY_ADDR.

But for both single-node/multi-gpu and multi-node/single-gpu, the code proceeds past distributeddataparallel without any issues, which is what is making this particularly perplexing.

Job is being run via slurm using torch 1.8.1+cu111 and nccl/2.8.3-cuda-11.1.1.


Key implementation details are as follows.

The batch script used to run the code has the key details:

export NPROCS_PER_NODE=2 # GPUs per node
export WORLD_SIZE=2 # Total nodes (total ranks are GPUs*World Size

RANK=0
for node in $HOSTLIST; do
ssh $node "
module load nccl/2.8.3-cuda-11.1.1
python3 -m torch.distributed.launch --nproc_per_node=$NPROCS_PER_NODE –
nnodes=$WORLD_SIZE --node_rank=$RANK --master_addr=$MASTER_ADDR -
master_port=$MASTER_PORT test.py > test_$RANK" &
RANK=$((RANK+1))
done
wait

The above is the multi-node multi-gpu configuration. For single-node multi-gpu it is modified so that NPROCS_PER_NODE=2, WORLD_SIZE=1; while multi-node single gpu is NPROCS_PER_NODE=1, WORLD_SIZE=2.

Key details of test.py are

parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("–local_rank", type=int, help=“Local rank. Necessary for using the torch.distributed.launch utility.”)
arg = parser.parse_args()

local_rank = arg.local_rank
torch.cuda.set_device(arg.local_rank)

torch.distributed.init_process_group(backend=‘nccl’, init_method=‘env://’)

model = model.cuda() #to(device)
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

train_sampler = DistributedSampler(dataset=train_set)

While torch.distributed.launch has recently been depreciated and replaced with.elastic_launch, moving to elastic_launch as a potential solution does not seem viable, due to the dependence on etcd which I’m unable to install due to access privilege restrictions.

If anyone had any suggestions about how to resolve this, I would greatly appreciate your input.

Thanks

For anyone who comes up against this issue - for me, of the connections reported by ifconfig on each device, only one ip matches the devices host name. Forcing that device in NCCL_SOCKET_IFNAME has fixed that, although runs are no longer consistent - with a moderate probability chance NCCL reports

RuntimeError: NCCL error in: /pytorch/torch/lib/c10d/ProcessGroupNCCL.cpp:761, internal error, NCCL version 2.7.8
ncclInternalError: Internal check failed. This is either a bug in NCCL or due to memory corruption

That it works at all (even with a little bit of management) is an improvement, but the stability is definitely an issue. Also, on repeated tests it appears that including the change to NCCL_SOCKET_IFNAME works for multi-GPU, multi-node, fails with a moderate probability on multi-node single-GPU, and fails with high probability on single-node single-GPU.

Likely a system issue, but rather a weird one.

Could you share a simple self contained repro for your test.py file that we can run locally and repro?

“While torch.distributed.launch has recently been depreciated and replaced with.elastic_launch, moving to elastic_launch as a potential solution does not seem viable, due to the dependence on etcd which I’m unable to install due to access privilege restrictions.” →

Hi! The “torch.distributed.launch” is going to be deprecated, but we still will support ability to use master based launches. We are going to have 2 modes for this:

  1. BC mode: nothing changes for users, they will still can provide ranks and all parameters as right now
  2. Dynamic: New launcher will be to automatically derive ranks and world size, based on master addr and master port

The etcd will not be required to use pytorch launchers as all. Etcd is relevant for long running jobs, that require high reliability and be able to withstand nodes failures.

Sure, thanks for that. This is the most basic code based implementatoin that would reproduce the error on my system without the NCCL_SOCKET_IFNAME tweaks.

import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse
import os

def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("–local_rank", type=int, help=“Local rank. Necessary for using the torch.distributed.launch utility.”)
arg = parser.parse_args()

local_rank = arg.local_rank   
torch.cuda.set_device(arg.local_rank)

print(os.environ["MASTER_PORT"], os.environ["MASTER_ADDR"], arg.local_rank, os.environ["LOCAL_RANK"], os.environ["RANK"], os.environ["WORLD_SIZE"])

torch.distributed.init_process_group(backend='nccl', init_method='env://') #, world_size=int(os.environ["WORLD_SIZE"]), rank=int(os.environ["RANK"]))
model = torchvision.models.resnet18(pretrained=False)

model = model.cuda()

ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
  
transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=False, transform=transform) 
test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=False, transform=transform)

train_sampler = DistributedSampler(dataset=train_set)

train_loader = DataLoader(dataset=train_set, batch_size=256, sampler=train_sampler, num_workers=8)
test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-5)

for epoch in range(5):
    ddp_model.train()

    for data in train_loader:
        inputs, labels = data[0].cuda(), data[1].cuda() #data[0].to(device), data[1].to(device)
        optimizer.zero_grad()
        outputs = ddp_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

if name == “main”:
main()

Thanks for your offer of assistance (and to @aivanou for the comments on elastic - I will have to read more into elastic to better understand it).