NCCL failed to connect

I have simple resnet I want to use with nccl with 2 nodes, 1 GPU / node. I use docker image from nvidia with pytorch, nccl and all.

However, I got this error on slave node that is connecting to master:

[3256] world_size = 2, rank = 1, backend=nccl
5906823ecc8f:3256:3256 [0] NCCL INFO cudaDriverVersion 12020
5906823ecc8f:3256:3256 [0] NCCL INFO Bootstrap : Using eth0:172.17.0.2<0>
5906823ecc8f:3256:3280 [0] NCCL INFO Plugin Path : /opt/hpcx/nccl_rdma_sharp_plugin/lib/libnccl-net.so
5906823ecc8f:3256:3280 [0] NCCL INFO P2P plugin IBext
5906823ecc8f:3256:3280 [0] NCCL INFO NET/IB : No device found.
5906823ecc8f:3256:3280 [0] NCCL INFO NET/IB : No device found.
5906823ecc8f:3256:3280 [0] NCCL INFO NET/Socket : Using [0]eth0:172.17.0.2<0>
5906823ecc8f:3256:3280 [0] NCCL INFO Using network Socket

5906823ecc8f:3256:3280 [0] misc/socket.cc:480 NCCL WARN socketStartConnect: Connect to 172.17.0.2<33229> failed : Software caused connection abort
5906823ecc8f:3256:3280 [0] NCCL INFO misc/socket.cc:561 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO misc/socket.cc:615 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO bootstrap.cc:270 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO init.cc:1303 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO group.cc:64 -> 2 [Async thread]
5906823ecc8f:3256:3256 [0] NCCL INFO group.cc:422 -> 2
5906823ecc8f:3256:3256 [0] NCCL INFO group.cc:106 -> 2
5906823ecc8f:3256:3256 [0] NCCL INFO comm 0x562ffdf76b90 rank 1 nranks 2 cudaDev 0 busId 2000 - Abort COMPLETE
Traceback (most recent call last):
  File "/mnt/resnet.py", line 167, in <module>
    main()
  File "/mnt/resnet.py", line 103, in main
    torch.distributed.barrier()
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/distributed_c10d.py", line 145, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/distributed_c10d.py", line 3553, in barrier
    work = default_pg.barrier(opts=opts)
torch.distributed.DistBackendError: NCCL error in: /opt/pytorch/pytorch/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp:1164, unhandled system error (run with NCCL_DEBUG=INFO for details), NCCL version 2.18.1
ncclSystemError: System call (e.g. socket, malloc) or external library call failed or device error.
Last error:
socketStartConnect: Connect to 172.17.0.2<33229> failed : Software caused connection abort
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 3256) of binary: /usr/bin/python
Traceback (most recent call last):
  File "/usr/local/bin/torchrun", line 33, in <module>
    sys.exit(load_entry_point('torch==2.1.0a0+4136153', 'console_scripts', 'torchrun')())
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 797, in main
    run(args)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 788, in run
    elastic_launch(
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/launcher/api.py", line 134, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/launcher/api.py", line 264, in launch_agent
    raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError:
============================================================
resnet.py FAILED
------------------------------------------------------------
Failures:
  <NO_OTHER_FAILURES>
------------------------------------------------------------

I run it with:

master_node_1 > torchrun --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr=master_ip --master_port=1777 main.py 

slave_node_2 > torchrun --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr=master_ip --master_port=1777 main.py

Initialization of process group works, since I can see printed text on both machines when I run the code:

torch.distributed.init_process_group("nccl",                             
                                         rank=world_rank, 
                                         world_size=world_size)

print(
        f"[{os.getpid()}] " +         
        f"world_size = {torch.distributed.get_world_size()}, " +
        f"rank = {torch.distributed.get_rank()}, " + 
        f"backend={torch.distributed.get_backend()}"
    )

but the code fails later
When I run it on a single node, it works and model is training.

My full code:

import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse
import os
import random
import numpy as np

def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

def evaluate(model, device, test_loader):

    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total

    return accuracy

def main():

    if (torch.distributed.is_available() is False):
        print("Distributed not available")
        return

    num_epochs_default = 10000
    batch_size_default = 256 # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"
    
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)    
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)    
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
    parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
    argv = parser.parse_args()

    
    local_rank = int(os.environ['LOCAL_RANK'])
    world_size = int(os.environ['WORLD_SIZE'])
    world_rank = int(os.environ['RANK'])

    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = 0.001
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume

    # Create directories outside the PyTorch program
    # Do not create directory here because it is not multiprocess safe
    '''
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    '''

    model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    # torch.distributed.init_process_group(backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")	
    torch.distributed.init_process_group("nccl",                             
                                         rank=world_rank, 
                                         world_size=world_size)

    print(
        f"[{os.getpid()}] " +         
        f"world_size = {torch.distributed.get_world_size()}, " +
        f"rank = {torch.distributed.get_rank()}, " + 
        f"backend={torch.distributed.get_backend()}"
    )


    torch.distributed.barrier()

    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18()

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    # Data should be prefetched
    # Download should be set to be False, because it is not multiprocess safe
    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=False, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=False, transform=transform)

    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler(dataset=train_set)

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))
        
        # Save and evaluate model routinely
        if epoch % 10 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                print("-" * 75)
                print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
                print("-" * 75)

        ddp_model.train()

        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

if __name__ == "__main__":
    
    main()

If you try

export TORCH_SHOW_CPP_STACKTRACES = 1
export NCCL_BLOCKING_WAIT=1

What errors do you see then?