Hi, I attempt to run a modified version of elastic_ddp.py
, based on this tutorial. The source code is shown at the end of this post. The command to run the code is:
$ torchrun --standalone --nnodes=1 --nproc_per_node=2 elastic_ddp.py
According to the documentation, the model is automatically synchronized between GPU’s as part of the loss.backward()
call. To ensure that the model is indeed synchronized across GPU’s, I send less batches to rank 0 than to rank 1. When evaluating the model on each GPU, I hope to see that both GPU’s show the same accuracy although one GPU received less batches. However, when running the program, the evaluation results are:
- (device_id 0) Accuracy: 49.98
- (device_id 1) Accuracy: 95.49
A possibly related problem is that when using the NCCL backend, the program hangs on the call to the DDP class. I reduced the timeout of dist.init_process_group
, and then the error message is DDP expects same model across all ranks, but Rank 0 has 4 params, while rank 1 has inconsistent 0 params
.
So instead of NCCL, I use the GLOO backend, although not recommended for Linux, but that one works (or seems to work).
Based on this link of somebody also experiencing problems with NCCL, I’ve disabled AMD SVM (virtualization) and IOMMU in the BIOS, but that did not help to get the NCCL backend working.
The machine is a AMD Threadripper PRO 5995WX 64 and 2x RTX4090, running Ubuntu 22.04.
The code of elastic_ddp.py
:
# Based on https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
class MnistModel(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu
def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def demo_basic():
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12358"
# initialize the process group
#
# dist.init_process_group("nccl", timeout=datetime.timedelta(seconds=10))
# program hangs on call to DDP constructor when 'nccl' is used as backend
# default timeout is 30 minutes; set to 10 seconds
# error message after shortened timeout:
# RuntimeError: DDP expects same model across all ranks, but Rank 0 has 4 params,
# while rank 1 has inconsistent 0 params.
dist.init_process_group("gloo")
rank = dist.get_rank()
world_size = dist.get_world_size()
device_id = rank % torch.cuda.device_count()
print(f"(device_id {device_id}) Start running basic DDP example.")
# some double checks; this code can be removed
print(f'(device_id {device_id}) Distributed is available: {torch.distributed.is_available()}')
print(f'(device_id {device_id}) Default process group has been initialized: {torch.distributed.is_initialized()}')
print(f'(device_id {device_id}) NCCL backend is available: {torch.distributed.is_nccl_available()}')
print(f'(device_id {device_id}) Process was launched with torch elastic: {torch.distributed.is_torchelastic_launched()}')
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
# DistributedSampler distributes half of the data to rank 0 and half of the data to rank 1
# Without the distributed sampler both gpu's each get the full dataset
sampler = DistributedSampler(train_dset, num_replicas=world_size, rank=rank, shuffle=False, drop_last=False)
train_loader = DataLoader(train_dset, batch_size=64, sampler=sampler)
# for evaluation, no need to distribute samples
test_loader = DataLoader(test_dset, shuffle=True, batch_size=64)
# create model and move it to GPU with id rank
model = MnistModel().to(device_id)
ddp_model = DDP(model, device_ids=[device_id])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
print(f"(device_id {device_id}) Start training")
ddp_model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device_id), target.to(device_id)
output = ddp_model(data)
loss = F.nll_loss(output, target)
loss.backward()
# small test to ensure that both gpu's contribute to the learning of the shared model
# device 1 gets 50% of te data (== 468 batches), whereas device 0 gets only 2 batches
# when evaluating on the gpu's accuracy on device 0 is lower then on device 1
if device_id == 1 or (device_id == 0 and batch_idx < 2):
optimizer.step()
optimizer.zero_grad()
print(f"(device_id {device_id}) Batch: {batch_idx}. ", end='\r')
print(f"\n(device_id {device_id}) Start evaluation *on the gpu's*")
ddp_model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device_id), target.to(device_id)
output = ddp_model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'(device_id {device_id}) Accuracy: {100. * correct / len(test_loader.dataset)}')
dist.destroy_process_group()
print(f"(device_id {device_id}) Finished running basic DDP example.")
if __name__ == "__main__":
demo_basic()