I have simple resnet I want to use with nccl with 2 nodes, 1 GPU / node. I use docker image from nvidia with pytorch, nccl and all.
However, I got this error on slave node that is connecting to master:
[3256] world_size = 2, rank = 1, backend=nccl
5906823ecc8f:3256:3256 [0] NCCL INFO cudaDriverVersion 12020
5906823ecc8f:3256:3256 [0] NCCL INFO Bootstrap : Using eth0:172.17.0.2<0>
5906823ecc8f:3256:3280 [0] NCCL INFO Plugin Path : /opt/hpcx/nccl_rdma_sharp_plugin/lib/libnccl-net.so
5906823ecc8f:3256:3280 [0] NCCL INFO P2P plugin IBext
5906823ecc8f:3256:3280 [0] NCCL INFO NET/IB : No device found.
5906823ecc8f:3256:3280 [0] NCCL INFO NET/IB : No device found.
5906823ecc8f:3256:3280 [0] NCCL INFO NET/Socket : Using [0]eth0:172.17.0.2<0>
5906823ecc8f:3256:3280 [0] NCCL INFO Using network Socket
5906823ecc8f:3256:3280 [0] misc/socket.cc:480 NCCL WARN socketStartConnect: Connect to 172.17.0.2<33229> failed : Software caused connection abort
5906823ecc8f:3256:3280 [0] NCCL INFO misc/socket.cc:561 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO misc/socket.cc:615 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO bootstrap.cc:270 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO init.cc:1303 -> 2
5906823ecc8f:3256:3280 [0] NCCL INFO group.cc:64 -> 2 [Async thread]
5906823ecc8f:3256:3256 [0] NCCL INFO group.cc:422 -> 2
5906823ecc8f:3256:3256 [0] NCCL INFO group.cc:106 -> 2
5906823ecc8f:3256:3256 [0] NCCL INFO comm 0x562ffdf76b90 rank 1 nranks 2 cudaDev 0 busId 2000 - Abort COMPLETE
Traceback (most recent call last):
File "/mnt/resnet.py", line 167, in <module>
main()
File "/mnt/resnet.py", line 103, in main
torch.distributed.barrier()
File "/usr/local/lib/python3.10/dist-packages/torch/distributed/distributed_c10d.py", line 145, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/torch/distributed/distributed_c10d.py", line 3553, in barrier
work = default_pg.barrier(opts=opts)
torch.distributed.DistBackendError: NCCL error in: /opt/pytorch/pytorch/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp:1164, unhandled system error (run with NCCL_DEBUG=INFO for details), NCCL version 2.18.1
ncclSystemError: System call (e.g. socket, malloc) or external library call failed or device error.
Last error:
socketStartConnect: Connect to 172.17.0.2<33229> failed : Software caused connection abort
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 3256) of binary: /usr/bin/python
Traceback (most recent call last):
File "/usr/local/bin/torchrun", line 33, in <module>
sys.exit(load_entry_point('torch==2.1.0a0+4136153', 'console_scripts', 'torchrun')())
File "/usr/local/lib/python3.10/dist-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 797, in main
run(args)
File "/usr/local/lib/python3.10/dist-packages/torch/distributed/run.py", line 788, in run
elastic_launch(
File "/usr/local/lib/python3.10/dist-packages/torch/distributed/launcher/api.py", line 134, in __call__
return launch_agent(self._config, self._entrypoint, list(args))
File "/usr/local/lib/python3.10/dist-packages/torch/distributed/launcher/api.py", line 264, in launch_agent
raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError:
============================================================
resnet.py FAILED
------------------------------------------------------------
Failures:
<NO_OTHER_FAILURES>
------------------------------------------------------------
I run it with:
master_node_1 > torchrun --nproc_per_node=1 --nnodes=2 --node_rank=0 --master_addr=master_ip --master_port=1777 main.py
slave_node_2 > torchrun --nproc_per_node=1 --nnodes=2 --node_rank=1 --master_addr=master_ip --master_port=1777 main.py
Initialization of process group works, since I can see printed text on both machines when I run the code:
torch.distributed.init_process_group("nccl",
rank=world_rank,
world_size=world_size)
print(
f"[{os.getpid()}] " +
f"world_size = {torch.distributed.get_world_size()}, " +
f"rank = {torch.distributed.get_rank()}, " +
f"backend={torch.distributed.get_backend()}"
)
but the code fails later
When I run it on a single node, it works and model is training.
My full code:
import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import argparse
import os
import random
import numpy as np
def set_random_seeds(random_seed=0):
torch.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)
def evaluate(model, device, test_loader):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data[0].to(device), data[1].to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = correct / total
return accuracy
def main():
if (torch.distributed.is_available() is False):
print("Distributed not available")
return
num_epochs_default = 10000
batch_size_default = 256 # 1024
learning_rate_default = 0.1
random_seed_default = 0
model_dir_default = "saved_models"
model_filename_default = "resnet_distributed.pth"
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
argv = parser.parse_args()
local_rank = int(os.environ['LOCAL_RANK'])
world_size = int(os.environ['WORLD_SIZE'])
world_rank = int(os.environ['RANK'])
num_epochs = argv.num_epochs
batch_size = argv.batch_size
learning_rate = 0.001
random_seed = argv.random_seed
model_dir = argv.model_dir
model_filename = argv.model_filename
resume = argv.resume
# Create directories outside the PyTorch program
# Do not create directory here because it is not multiprocess safe
'''
if not os.path.exists(model_dir):
os.makedirs(model_dir)
'''
model_filepath = os.path.join(model_dir, model_filename)
# We need to use seeds to make sure that the models initialized in different processes are the same
set_random_seeds(random_seed=random_seed)
# Initializes the distributed backend which will take care of sychronizing nodes/GPUs
# torch.distributed.init_process_group(backend="nccl")
# torch.distributed.init_process_group(backend="gloo")
torch.distributed.init_process_group("nccl",
rank=world_rank,
world_size=world_size)
print(
f"[{os.getpid()}] " +
f"world_size = {torch.distributed.get_world_size()}, " +
f"rank = {torch.distributed.get_rank()}, " +
f"backend={torch.distributed.get_backend()}"
)
torch.distributed.barrier()
# Encapsulate the model on the GPU assigned to the current process
model = torchvision.models.resnet18()
device = torch.device("cuda:{}".format(local_rank))
model = model.to(device)
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
# We only save the model who uses device "cuda:0"
# To resume, the device for the saved model would also be "cuda:0"
if resume == True:
map_location = {"cuda:0": "cuda:{}".format(local_rank)}
ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))
# Prepare dataset and dataloader
transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# Data should be prefetched
# Download should be set to be False, because it is not multiprocess safe
train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=False, transform=transform)
test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=False, transform=transform)
# Restricts data loading to a subset of the dataset exclusive to the current process
train_sampler = DistributedSampler(dataset=train_set)
train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
# Test loader does not have to follow distributed sampling strategy
test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)
# Loop over the dataset multiple times
for epoch in range(num_epochs):
print("Local Rank: {}, Epoch: {}, Training ...".format(local_rank, epoch))
# Save and evaluate model routinely
if epoch % 10 == 0:
if local_rank == 0:
accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
torch.save(ddp_model.state_dict(), model_filepath)
print("-" * 75)
print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
print("-" * 75)
ddp_model.train()
for data in train_loader:
inputs, labels = data[0].to(device), data[1].to(device)
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if __name__ == "__main__":
main()