Hi,
I’m currently trying to figure out how to properly implement DDP with cleanup, barrier, and its expected output. While I think gives the dpp tutorial Getting Started with Distributed Data Parallel — PyTorch Tutorials 1.11.0+cu102 documentation gives a great initial example on how to do this, I’m having some trouble translating that example to something more illustrative. I’ve chosen to translate the PyTorch CIFAR example Training a Classifier — PyTorch Tutorials 1.11.0+cu102 documentation into the form of the above mentioned ddp tutorial.
For context: I’m currently running on a single node with 4 GPUs:
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 510.47.03 Driver Version: 510.47.03 CUDA Version: 11.6 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M|
| Fan Temp Perf Pwr:Usage/Cap|
| |
|===============================+
| 0 Tesla V100S-PCI... Off |
| N/A 35C P0 37W / 250W |
| |
+-------------------------------+
| 1 Tesla V100S-PCI... Off |
| N/A 33C P0 36W / 250W |
| |
+-------------------------------+
| 2 Tesla V100S-PCI... Off |
| N/A 32C P0 35W / 250W |
| |
+-------------------------------+
| 3 Tesla V100S-PCI... Off |
| N/A 32C P0 37W / 250W |
| |
+-------------------------------+
The code I’m implementing can be found below (with many of the comments from the original CIFAR example removed for brevity of this post):
import os
import sys
import tempfile
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
def imshow(img):
img = img / 2 + 0.5 # unnormalize
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def generate_data():
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
batch_size = 4
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
shuffle=True)#, num_workers=2)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
shuffle=False)#, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
#dataiter = iter(trainloader)
#images, labels = dataiter.next()
#imshow(torchvision.utils.make_grid(images))
# print labels
#print(' '.join(f'{classes[labels[j]]:5s}' for j in range(batch_size)))
return trainset, trainloader, testset, testloader, classes
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
trainset, trainloader, testset, testloader, classes = generate_data()
net = Net().to(rank)
ddp_model = DDP(net, device_ids=[rank])
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001, momentum=0.9)
for epoch in range(2):
running_loss = 0.0
for i, data in enumerate(trainloader):
inputs, labels = data
optimizer.zero_grad()
labels = labels.to(rank)
outputs = ddp_model(inputs.to(rank))
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
# Should cleanup be here or below?
#cleanup()
if i % 2000 == 1999:
print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
running_loss = 0.0
break
# Should cleanup be here or above?
cleanup()
print('Finished Training')
def demo_checkpoint(rank, world_size):
print(f"Running checkpoint DDP example on rank {rank}.")
setup(rank, world_size)
trainset, trainloader, testset, testloader, classes = generate_data()
net = Net().to(rank)
ddp_model = DDP(net, device_ids=[rank])
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001, momentum=0.9)
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location))
for epoch in range(2):
running_loss = 0.0
for i, data in enumerate(trainloader):
inputs, labels = data
optimizer.zero_grad()
labels = labels.to(rank)
outputs = ddp_model(inputs.to(rank))
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
# Should cleanup be here or below?
#cleanup()
if i % 2000 == 1999:
print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
running_loss = 0.0
break
# Should the model be saved before or after cleanup?
#
#Saving the model after each epoch
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Should cleanup be here or above?
#cleanup()
# Now that we are done with training, do we now cleanup the processes again?
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
print('Finished Training')
#PATH = './cifar_net.pth'
#torch.save(net.state_dict(), PATH)
dataiter = iter(testloader)
images, labels = dataiter.next()
#imshow(torchvision.utils.make_grid(images))
#print('GroundTruth: ', ' '.join(f'{classes[labels[j]]:5s}' for j in range(4)))
# Reloading the model to the rank and to DDP to since we have already
# executed "cleanup" on the model
# create local model
net = Net().to(rank)
ddp_model = DDP(net, device_ids=[rank])
# Loading our model since we have redefined our
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location))
outputs = ddp_model(images)
_, predicted = torch.max(outputs, 1)
print('Predicted: ', ' '.join(f'{classes[predicted[j]]:5s}'
for j in range(4)))
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
labels = labels.to(rank)
outputs = ddp_model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f'Accuracy of the network on the 10000 test images: {100 * correct // total} %')
correct_pred = {classname: 0 for classname in classes}
total_pred = {classname: 0 for classname in classes}
with torch.no_grad():
for data in testloader:
images, labels = data
labels = labels.to(rank)
outputs = ddp_model(images)
_, predictions = torch.max(outputs, 1)
for label, prediction in zip(labels, predictions):
labels = labels.to(rank)
if label == prediction:
correct_pred[classes[label]] += 1
total_pred[classes[label]] += 1
for classname, correct_count in correct_pred.items():
accuracy = 100 * float(correct_count) / total_pred[classname]
print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')
class NetMP(nn.Module):
def __init__(self, dev0, dev1, dev2, dev3):
super(NetMP, self).__init__()
#super().__init__()
self.dev0 = dev0
self.dev1 = dev1
self.dev2 = dev2
self.dev3 = dev3
self.conv1 = nn.Conv2d(3, 6, 5).to(dev0)
self.pool = nn.MaxPool2d(2, 2).to(dev1)
self.conv2 = nn.Conv2d(6, 16, 5).to(dev2)
self.fc1 = nn.Linear(16 * 5 * 5, 120).to(dev3)
self.fc2 = nn.Linear(120, 84).to(dev1)
self.fc3 = nn.Linear(84, 10).to(dev2)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1) # flatten all dimensions except batch
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def demo_model_parallel(rank, world_size):
# For the sake of reducing redundncy let's implement demo_model_parallel as a
# version of demo_basic, not demo_checkpoint
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
dev0 = ((rank * 4) % 4)
dev1 = ((rank * 4 + 1) % 4)
dev2 = ((rank * 4 + 2) % 4)
dev3 = ((rank * 4 + 3) % 4)
trainset, trainloader, testset, testloader, classes = generate_data()
net = NetMP(dev0, dev1, dev2, dev3)
ddp_model = DDP(net)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001, momentum=0.9)
for epoch in range(2):
running_loss = 0.0
for i, data in enumerate(trainloader):
inputs, labels = data
optimizer.zero_grad()
labels = labels.to(dev1)
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
# Should cleanup be here or below?
#cleanup()
if i % 2000 == 1999:
print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
running_loss = 0.0
break
# Should cleanup be here or above?
cleanup()
print('Finished Training')
if __name__=="__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
run_demo(demo_model_parallel, world_size)
To simplify our work, let’s first make the small adjustment of:
run_demo(demo_basic, world_size)
#run_demo(demo_checkpoint, world_size)
#run_demo(demo_model_parallel, world_size)
Keeping an eye on our GPUs we see:
+-------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=========================================================================|
| 0 N/A N/A 0000001 C 307MiB |
| 0 N/A N/A 0000001 C ...onda3/envs/env/bin/python 917MiB |
| 0 N/A N/A 0000001 C ...onda3/envs/env/bin/python 905MiB |
| 0 N/A N/A 0000001 C ...onda3/envs/env/bin/python 905MiB |
| 0 N/A N/A 0000001 C ...onda3/envs/env/bin/python 905MiB |
| 1 N/A N/A 0000001 C 307MiB |
| 1 N/A N/A 0000001 C ...onda3/envs/env/bin/python 1403MiB |
| 2 N/A N/A 0000001 C 307MiB |
| 2 N/A N/A 0000001 C ...onda3/envs/env/bin/python 1403MiB |
| 3 N/A N/A 0000001 C 307MiB |
| 3 N/A N/A 0000001 C ...onda3/envs/env/bin/python 1403MiB |
+-------------------------------------------------------------------------+
and all values in our Volatile GPU-Util are less than 100%. Now our terminal output looks like:
Running basic DDP example on rank 0.
Running basic DDP example on rank 1.
Running basic DDP example on rank 2.
Running basic DDP example on rank 3.
[1, 2000] loss: 2.181
[1, 2000] loss: 2.187
[1, 2000] loss: 2.180
[1, 2000] loss: 2.187
[2, 2000] loss: 1.740
[2, 2000] loss: 1.737
[2, 2000] loss: 1.739
Finished Training
[2, 2000] loss: 1.737
Finished Training
Finished Training
Finished Training
Here is my first issue- if things are being distributed and run in parallel, then shouldn’t we have a single loss per epoch rather than 4 losses per epoch? From the output it appears that instead of sending the forward and backprop work from rank 0 to the other ranks and then having rank 0 collect that result and deliver a single result to the user (us), what’s going on instead is that each rank is running it’s own forward pass and backprop and then sending it out to the user; in effect while the work is parallel, the ranks are not working together but rather independently. This behavior in fact will not result in a speedup but will be only as slow as the slowest rank execution.
Is my interpretation of what’s going on correct or am I missing something? Also is this expected behavior on how to use DDP in this instance or is my code incorrect? Any and all thoughts are welcome.
Let’s go ahead and run the same code with the small change of:
#run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
#run_demo(demo_model_parallel, world_size)
This shoots our Volatile GPU-Util on all of our GPUs to be 100%. Our terminal output now stalls at
Running checkpoint DDP example on rank 3.
Running checkpoint DDP example on rank 1.
Running checkpoint DDP example on rank 0.
Running checkpoint DDP example on rank 2.
At this point I have to manually stop the code because it completely stalls, yet no error shows up. After a little debugging I find that rank 0 does not pass the line
dist.barrier()
I’m not sure how to debug or fix this, especially given that it’s part of the original DDP example (and its function and placement makes sense)… Any thoughts?
Let’s go ahead and run the same code with the small change of:
#run_demo(demo_basic, world_size)
#run_demo(demo_checkpoint, world_size)
run_demo(demo_model_parallel, world_size)
after which I am greeted with this error:
Traceback (most recent call last):
File "/user/classical_parallel_compute.py", line 425, in <module>
run_demo(demo_model_parallel, world_size)
File "/user/classical_parallel_compute.py", line 53, in run_demo
mp.spawn(demo_fn,
File "/user/miniconda3/envs/env/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 230, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
File "/user/miniconda3/envs/env/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 188, in start_processes
while not context.join():
File "/user/miniconda3/envs/env/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 150, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:
-- Process 2 terminated with the following error:
Traceback (most recent call last):
File "/user/miniconda3/envs/env/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 59, in _wrap
fn(i, *args)
File "/user/classical_parallel_compute.py", line 378, in demo_model_parallel
ddp_model = DDP(net)#, device_ids=[rank])
File "/user/miniconda3/envs/env/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 578, in __init__
dist._verify_model_across_ranks(self.process_group, parameters)
RuntimeError: NCCL error in: /user/conda/feedstock_root/build_artifacts/pytorch-recipe_1640869844479/work/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp:957, invalid usage, NCCL version 21.1.4
ncclInvalidUsage: This usually reflects invalid usage of NCCL library (such as too many async ops, too many collectives at once, mixing streams in a group, etc).
Running
NCCL_DEBUG=INFO python classical_parallel_compute.py
My output gave me an error that looked something like
NCCL WARN Duplicate GPU detected : rank 0 and rank 1 both on CUDA device 3...
NCCL WARN Duplicate GPU detected : rank 3 and rank 0 both on CUDA device 3....
another post solves this problem via
net.to(f'cuda:{args.local_rank}')
However you’ll notice that in the Model Parallelism example for DDP the model itself was sent to a rank and not a CUDA device, so therefore I am hesitant to say that the above solution applies.
I’m not sure what’s going wrong or what needs fixing. I suspect that I’m using cleanup()
incorrectly in my attempt to accumulate the behavior of the GPUs on my system, though that may not be the problem at all…
Any suggestions to code changes or advice as to what to fix would be very much appreciated.