`torch.distributed.barrier` used in multi-node distributed data-parallel training

Hello,
I was trying to improve one of my multi-node distributed training examples (https://leimao.github.io/blog/PyTorch-Distributed-Training/) by adding some torch.distributed.barrier so that I could do some multiprocess-unsafe actions, such as data download and folder creation. After adding the torch.distributed.barrier, the training could still be done on a single-node multi-GPU machine. However, it got halted on a multi-node multi-GPU machine. Can anyone suggest if it is a PyTorch bug or it is my problem? Thank you.
Here is also the modified script that has torch.distributed.barrier:

import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse
import os
import random
import numpy as np

def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

def evaluate(model, device, test_loader):

    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total

    return accuracy

def main():

    num_epochs_default = 100
    batch_size_default = 256 # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"

    # Each process runs on 1 GPU device specified by the local_rank argument.
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
    parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
    parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
    argv = parser.parse_args()

    local_rank = argv.local_rank
    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = argv.learning_rate
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")

    if local_rank != 0:
        torch.distributed.barrier()

    # Create directories outside the PyTorch program
    # Only create directory in one process because it is not multiprocess safe
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)

    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)

    model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)

    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18(pretrained=False)

    if local_rank == 0:
        torch.distributed.barrier()

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler(dataset=train_set)

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))
        
        # Save and evaluate model routinely
        if epoch % 10 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                print("-" * 75)
                print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
                print("-" * 75)

        ddp_model.train()

        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

if __name__ == "__main__":
    
    main()
1 Like

barrier() requires all processes in your process group to join, so this is incorrect:

if local_rank == 0:
        torch.distributed.barrier()

Remember, all collective APIs of torch.distributed(i.e. not include P2P API: send, recv, isend, irecv), requires all processes in your created process group, either the implicit global group or a sub group created by torch.distributed.new_group, to execute.

Will this solve your problem? Please have a test and respond. :blush:

Thank you @iffiX for the insightful response. I am not sure if I fully understood, but I do have:

    if local_rank != 0:
        torch.distributed.barrier()

earlier in the code. The purpose is to pause the execution of all the local ranks except for the first local rank to create directory and download dataset without conflicts. Once the first local rank completed the download and directory creation, the reset of local ranks could use the downloaded dataset and directory.

In your opinion, how should I modify my code in particular? Thank you.

Best,

Lei

just do:

torch.distributed.barrier()

without if,
since:

This collective blocks processes until the whole group enters this function

Thank you very much @iffiX. I removed the if statement, and here is the code I was running:

import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse
import os
import random
import numpy as np

def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

def evaluate(model, device, test_loader):

    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total

    return accuracy

def main():

    num_epochs_default = 100
    batch_size_default = 256 # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"

    # Each process runs on 1 GPU device specified by the local_rank argument.
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
    parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
    parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
    argv = parser.parse_args()

    local_rank = argv.local_rank
    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = argv.learning_rate
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")

    if local_rank != 0:
        torch.distributed.barrier()

    # Create directories outside the PyTorch program
    # Only create directory in one process because it is not multiprocess safe
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)

    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)

    model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)

    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18(pretrained=False)

    '''
    if local_rank != 0:
        torch.distributed.barrier()
    '''
    torch.distributed.barrier()

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler(dataset=train_set)

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))
        
        # Save and evaluate model routinely
        if epoch % 10 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                print("-" * 75)
                print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
                print("-" * 75)

        ddp_model.train()

        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

if __name__ == "__main__":
    
    main()

However, it still got halted.

My former implementation was actually inspired by the implementation from HuggingFace transformer. Here is how they were using the torch.distributed.barrier.



I did not run their code in person by the way.

Hey @leimao, which line caused the hang on rank0 and other ranks?

BTW, how did you launch the program? Are the following parameters used in this experiment? And it works if you remove the barrier + save/load code?

$ python -m torch.distributed.launch --nproc_per_node=8 --nnodes=2 --node_rank=0 --master_addr="192.168.0.1" --master_port=1234 resnet_ddp.py
$ python -m torch.distributed.launch --nproc_per_node=8 --nnodes=2 --node_rank=1 --master_addr="192.168.0.1" --master_port=1234 resnet_ddp.py

One more question, during the execution, did you set resume to True or False? I am not confident if ddp_model.load_state_dict can restore all DDP states properly. We don’t have tests covering that yet. It might be safer to save ddp_model.module and then reconstruct DDP instances from the loaded ddp_model.module.

1 Like

Thank you very much @mrshenli. I believe it got halted in the second torch.distributed.barrier. Because I could see the data download/preprocessing was successful on both nodes:

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified

I was using the following command to start this time, only changing the number of gpus from 8 to 4 for each node. It runs fine with or without barriers if I only train using one single node.

$ python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr="192.168.0.1" --master_port=1234 resnet_ddp.py
$ python -m torch.distributed.launch --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr="192.168.0.1" --master_port=1234 resnet_ddp.py

I have not tried removing the saving model code but will give it a shot.

For now, resume is always False during my test, i.e., it is always training from scratch. So we could safely ignore those code for now.

1 Like

To put it simply, if you just want process to execute mkdir, download, etc, then you should:

import torch
import argparse


def main():
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")

    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int)
    args = parser.parse_args()
    local_rank = args.local_rank
    
    torch.distributed.barrier()

    if local_rank == 0:
        print(local_rank)
    
    torch.distributed.barrier()

    print("{} exit".format(local_rank))


if __name__ == "__main__":
    main()

this will print:

0
0 exit
2 exit
1 exit3 exit

And should not

import torch
import argparse


def main():
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")

    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int)
    args = parser.parse_args()
    local_rank = args.local_rank
    
    if local_rank != 0:
        torch.distributed.barrier()

    print(local_rank)
    
    if local_rank == 0:
        torch.distributed.barrier()

    print("{} exit".format(local_rank))


if __name__ == "__main__":
    main()

which will print

0
0 exit
2
2 exit
13
3 exit

1 exit

barrier is just a barrier, it requires all processes in the group to reach one barrier function, no matter where it is placed, so the second function basically delays all other processes (except 0), unless the code in between two barriers is a not-effective (equal to return / pass) once any process has executed it (Eg: process 0), you are not going to get your expected result.

And please make sure that your CUDA runtime has the same major & minor version as your the CUDA version your torch you have built with, 9 is not compatible with 10, so you are likely to experience some issues when using “nccl” or cuda tensor computations.

1 Like

Thank you very much for repeating all the experiments @iffiX. I wanted to download CIFAR-10 dataset using local rank 0, and once the local rank 0 has downloaded the dataset, local rank 1, 2, and 3 could proceed and use the downloaded cache for data preprocessing.

    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)

However, I don’t see your solution,

import torch
import argparse


def main():
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")

    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int)
    args = parser.parse_args()
    local_rank = args.local_rank
    
    torch.distributed.barrier()

    if local_rank == 0:
        print(local_rank)
    
    torch.distributed.barrier()

    print("{} exit".format(local_rank))


if __name__ == "__main__":
    main()

in particular, is able to do this.
The printout of your second code snippet, in particular,

def main():
    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")

    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int)
    args = parser.parse_args()
    local_rank = args.local_rank
    
    if local_rank != 0:
        torch.distributed.barrier()

    print(local_rank)
    
    if local_rank == 0:
        torch.distributed.barrier()

    print("{} exit".format(local_rank))


if __name__ == "__main__":
    main()

is expected and it is also what I was trying to implement. I want local rank 0 to do all the stuff once, then local rank 1, 2, and 3 start to the stuff in their own processes.

I think my CUDA version is compatible with PyTorch. I am using CUDA 10.2 + PyTorch 1.51.

The “asynchronous barrier” was also used in the HuggingFace example that I mentioned above. Since many people are using HuggingFace, I think their code at least runs fine on single node.

I thought of inelegant way to get around:

import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse
import os
import random
import numpy as np

def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

def evaluate(model, device, test_loader):

    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total

    return accuracy

def main():

    num_epochs_default = 100
    batch_size_default = 256 # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"

    # Each process runs on 1 GPU device specified by the local_rank argument.
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
    parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
    parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
    argv = parser.parse_args()

    local_rank = argv.local_rank
    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = argv.learning_rate
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")

    # torch.distributed.barrier()
    # Create directories outside the PyTorch program
    # Only create directory in one process because it is not multiprocess safe
    if local_rank == 0:
        if not os.path.exists(model_dir):
            os.makedirs(model_dir)

    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])


    if local_rank == 0:
        train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform) 
        test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)
        
    torch.distributed.barrier()

    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)

    model_filepath = os.path.join(model_dir, model_filename)


    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)

    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18(pretrained=False)

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler(dataset=train_set)

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))
        
        # Save and evaluate model routinely
        if epoch % 10 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                print("-" * 75)
                print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
                print("-" * 75)

        ddp_model.train()

        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

if __name__ == "__main__":
    
    main()

But it still got stuck.
On node 0:

100.0%Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Local Rank: 3, Epoch: 0, Training ...
Local Rank: 2, Epoch: 0, Training ...
Local Rank: 1, Epoch: 0, Training ...
Local Rank: 0, Epoch: 0, Training ...

On node 1:

100.0%Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified

@mrshenli I commented the model saving code but still got halted.

after reading your code a little bit more carefully I agree that you may use the second solution since all processes needs to create the data loader, so the problem is not there.
Could you please try to add some printing functions such as:

print("line230")
...
print("line232")

to show exactly where you code has halted? current log is way to limited to determine the exact statement which caused you code to halt.
And don’t forget to take care of ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location)) after solving the halting issue, as @mrshenli said.

@mrshenli In your tutorial (https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#save-and-load-checkpoints), I saw you were using ddp_model.load_state_dict to load model parameters. Is this method untested and unfavored?
I remember the example I documented in my blog post works perfectly. I tested model resuming a while ago and it worked fine. It’s having problems only when I tried to add some barrier functions a few days ago.
Thank you.

@iffiX @mrshenli It seems that I have located where the halting is happening. Running the following code:

import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import argparse
import os
import random
import numpy as np

def set_random_seeds(random_seed=0):

    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

def evaluate(model, device, test_loader):

    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = correct / total

    return accuracy

def main():

    num_epochs_default = 100
    batch_size_default = 256 # 1024
    learning_rate_default = 0.1
    random_seed_default = 0
    model_dir_default = "saved_models"
    model_filename_default = "resnet_distributed.pth"

    # Each process runs on 1 GPU device specified by the local_rank argument.
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument("--local_rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
    parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
    parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
    parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
    parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
    parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
    parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
    argv = parser.parse_args()

    local_rank = argv.local_rank
    num_epochs = argv.num_epochs
    batch_size = argv.batch_size
    learning_rate = argv.learning_rate
    random_seed = argv.random_seed
    model_dir = argv.model_dir
    model_filename = argv.model_filename
    resume = argv.resume

    # Initializes the distributed backend which will take care of sychronizing nodes/GPUs
    torch.distributed.init_process_group(backend="nccl")
    # torch.distributed.init_process_group(backend="gloo")

    if local_rank != 0:
        torch.distributed.barrier()
    
    print("Local Rank: {} | Location: {}".format(local_rank, 0))

    # Create directories outside the PyTorch program
    # Only create directory in one process because it is not multiprocess safe
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)

    # Prepare dataset and dataloader
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=True, transform=transform) 
    test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=True, transform=transform)

    model_filepath = os.path.join(model_dir, model_filename)

    # We need to use seeds to make sure that the models initialized in different processes are the same
    set_random_seeds(random_seed=random_seed)

    # Encapsulate the model on the GPU assigned to the current process
    model = torchvision.models.resnet18(pretrained=False)

    print("Local Rank: {} | Location: {}".format(local_rank, 1))

    if local_rank == 0:
        torch.distributed.barrier()

    print("Local Rank: {} | Location: {}".format(local_rank, 2))

    device = torch.device("cuda:{}".format(local_rank))
    model = model.to(device)
    print("Local Rank: {} | Location: {}".format(local_rank, 2.1))
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
    print("Local Rank: {} | Location: {}".format(local_rank, 2.2))

    # We only save the model who uses device "cuda:0"
    # To resume, the device for the saved model would also be "cuda:0"
    if resume == True:
        map_location = {"cuda:0": "cuda:{}".format(local_rank)}
        ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

    
    # Restricts data loading to a subset of the dataset exclusive to the current process
    train_sampler = DistributedSampler(dataset=train_set)
    

    train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
    # Test loader does not have to follow distributed sampling strategy
    test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)
    print("Local Rank: {} | Location: {}".format(local_rank, 2.3))

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)

    # Loop over the dataset multiple times
    for epoch in range(num_epochs):

        print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))

        print("Local Rank: {} | Location: {}".format(local_rank, 3))
        
        # Save and evaluate model routinely
        if epoch % 10 == 0:
            if local_rank == 0:
                accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
                torch.save(ddp_model.state_dict(), model_filepath)
                print("-" * 75)
                print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
                print("-" * 75)

        print("Local Rank: {} | Location: {}".format(local_rank, 4))

        ddp_model.train()

        for data in train_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

if __name__ == "__main__":
    
    main()

For the node 0:

Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified
Local Rank: 0 | Location: 1
Local Rank: 0 | Location: 2
Local Rank: 2 | Location: 0
Local Rank: 3 | Location: 0
Local Rank: 1 | Location: 0
Local Rank: 0 | Location: 2.1
Local Rank: 0 | Location: 2.2
Local Rank: 0 | Location: 2.3
Local Rank: 0, Epoch: 0, Training ...
Local Rank: 0 | Location: 3
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Local Rank: 2 | Location: 1
Local Rank: 2 | Location: 2
Local Rank: 1 | Location: 1
Local Rank: 1 | Location: 2
Local Rank: 3 | Location: 1
Local Rank: 3 | Location: 2
Local Rank: 2 | Location: 2.1
Local Rank: 1 | Location: 2.1
Local Rank: 3 | Location: 2.1
Local Rank: 2 | Location: 2.2
Local Rank: 2 | Location: 2.3
Local Rank: 1 | Location: 2.2
Local Rank: 1 | Location: 2.3
Local Rank: 2, Epoch: 0, Training ...
Local Rank: 2 | Location: 3
Local Rank: 2 | Location: 4
Local Rank: 1, Epoch: 0, Training ...
Local Rank: 1 | Location: 3
Local Rank: 1 | Location: 4
Local Rank: 3 | Location: 2.2
Local Rank: 3 | Location: 2.3
Local Rank: 3, Epoch: 0, Training ...
Local Rank: 3 | Location: 3
Local Rank: 3 | Location: 4

For the node 1:

Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified
Local Rank: 0 | Location: 1
Local Rank: 0 | Location: 2
Local Rank: 2 | Location: 0
Local Rank: 3 | Location: 0
Local Rank: 1 | Location: 0
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Local Rank: 0 | Location: 2.1
Local Rank: 2 | Location: 1
Local Rank: 2 | Location: 2
Local Rank: 1 | Location: 1
Local Rank: 1 | Location: 2
Local Rank: 3 | Location: 1
Local Rank: 3 | Location: 2
Local Rank: 2 | Location: 2.1
Local Rank: 1 | Location: 2.1
Local Rank: 3 | Location: 2.1

So the second node got halted in

ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

Since you are running 1.5.1, I just dive into 1.5.1 code and can verify that the newest DistributedDataParallel do have a _sync_params class method which will broadcast all parameters and buffers, then set local params with inplace operation _set:

def _sync_params(self):
        with torch.no_grad():
            # only do intra-node parameters sync for replicated single-device
            # CUDA modules
            if self.device_ids and len(self.device_ids) > 1:
                # intra-node parameter sync
                result = torch.cuda.comm.broadcast_coalesced(
                    self.modules_params[0],
                    self.device_ids,
                    self.broadcast_bucket_size)
                for tensors, module_params in zip(result[1:],
                                                  self.modules_params[1:]):
                    for tensor, param in zip(tensors, module_params):
                        param.set_(tensor)
                        # Assume we have just run the optimizer and zeroed the
                        # grads of the parameters on the root model. We need
                        # to zero the grads on all model replicas as well.
                        # This snippet is copied from torch.optim.Optimizer.
                        if param.grad is not None:
                            param.grad.detach_()
                            param.grad.zero_()

And _sync_params will be invoked when you perform a forward operation, if syncing is enabled:

def forward(self, *inputs, **kwargs):
        if self.require_forward_param_sync:
            self._sync_params()

so load_state_dict() should work, theoretically, because newly loaded params will be broadcasted to other processes.
Sorry about my outdated knowledge above

I think your code is correct, there really isn’t any visible issue with:

    model = model.to(device)
    print("Local Rank: {} | Location: {}".format(local_rank, 2.1))
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)

My knowledge is not enough to explain this behavior, some possible debug solutions:

  1. will “gloo” halt?
  2. insert some more print tracer into pytorch source code

It most likely would be a problem of nccl becasue DDP basically does these things in initialization:

  1. call dist._broadcast_coleased to broadcast parameters to all groups

    dist._broadcast_coleased is defined in torch/csrc/distributed/c10d/comm.cpp,
    however, since it is a private function, there is no indication about whether it is blocking etc, I only know that it is invoked by all processes.

  2. call _ddp_init_helper, which basically only do some local operations like:

    Initialization helper function that does the following:
    
         (1) replicating the module from device[0] to the other devices
         (2) bucketing the parameters for reductions
         (3) resetting the bucketing states
         (4) registering the grad hooks
         (5) passing a handle of DDP to SyncBatchNorm Layer
    

You can check nccl installation with, but this might not help you much if the “gloo” backend also halts:

:slightly_frowning_face: Sorry that I cannot help you more with this problem.