Hi everyone,
I have been using a library to enable me to do DDP but I have found out that it was hard dealing with bugs as that library had many which slowed down my research process, so I have decided to refactor my code into pure PyTorch and build my own simple trainer for my custom pipeline.
I wanted to implement DDP to utilize multiple GPUs for training large batches. After spending some quality time, I have managed to process a working example of DDP on MNIST. The issue is after I wanted to see the difference in GPU usage when running one GPU vs. Multiple GPUs, it seems that both are utilizing ~810MB of GPU memory on Titan X GPU. I wasn’t expecting this to happen as when utilizing DDP correctly, I should be expecting GPU utilization to go down as both GPUs have smaller batch sizes to be processed.
Here is the code that I was testing on:
# credits:
# how to use DDP module with DDP sampler: https://gist.github.com/sgraaf/5b0caa3a320f28c27c12b5efeb35aa4c
# how to setup a basic DDP example from scratch: https://pytorch.org/tutorials/intermediate/dist_tuto.html
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import random
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
import math
def get_dataset():
world_size = dist.get_world_size()
train_set = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
val_set = datasets.MNIST('./data', train=False, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
train_sampler = DistributedSampler(train_set)
val_sampler = DistributedSampler(val_set)
batch_size = int(128 / float(world_size))
train_loader = DataLoader(
dataset=train_set,
sampler=train_sampler,
batch_size=batch_size
)
val_loader = DataLoader(
dataset=val_set,
sampler=val_sampler,
batch_size=batch_size
)
return train_loader, val_loader, batch_size
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
def reduce_dict(input_dict, average=True):
world_size = float(dist.get_world_size())
names, values = [], []
for k in sorted(input_dict.keys()):
names.append(k)
values.append(input_dict[k])
values = torch.stack(values, dim=0)
dist.all_reduce(values, op=dist.ReduceOp.SUM)
if average:
values /= world_size
reduced_dict = {k: v for k, v in zip(names, values)}
return reduced_dict
def train(model,train_loader,optimizer,batch_size):
device = torch.device(f"cuda:{dist.get_rank()}")
train_num_batches = int(math.ceil(len(train_loader.dataset) / float(batch_size)))
model.train()
# let all processes sync up before starting with a new epoch of training
dist.barrier()
train_loss = 0.0
for data, target in train_loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss_ = {'loss': torch.tensor(loss.item()).to(device)}
train_loss += reduce_dict(loss_)['loss'].item()
loss.backward()
average_gradients(model)
optimizer.step()
train_loss_val = train_loss / train_num_batches
return train_loss_val
def val(model, val_loader,batch_size):
device = torch.device(f"cuda:{dist.get_rank()}")
val_num_batches = int(math.ceil(len(val_loader.dataset) / float(batch_size)))
model.eval()
# let all processes sync up before starting with a new epoch of training
dist.barrier()
val_loss = 0.0
with torch.no_grad():
for data, target in val_loader:
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss_ = {'loss': torch.tensor(loss.item()).to(device)}
val_loss += reduce_dict(loss_)['loss'].item()
val_loss_val = val_loss / val_num_batches
return val_loss_val
def run(rank, world_size):
device = torch.device(f"cuda:{rank}")
torch.manual_seed(1234)
train_loader, val_loader, batch_size = get_dataset()
model = Net().to(device)
model = DDP(model,device_ids=[rank],output_device=rank)
optimizer = optim.SGD(model.parameters(),lr=0.01, momentum=0.5)
history = {
"rank": rank,
"train_loss_val": [],
"train_acc_val": [],
"val_loss_val": [],
"val_acc_val": []
}
if rank == 0:
history = {
"rank": rank,
"train_loss_val": [],
"train_acc_val": [],
"val_loss_val": [],
"val_acc_val": []
}
for epoch in range(10):
train_loss_val = train(model,train_loader,optimizer,batch_size)
val_loss_val = val(model,val_loader,batch_size)
print(f'Rank {rank} epoch {epoch}: {train_loss_val:.2f}/{val_loss_val:.2f}')
history['train_loss_val'].append(train_loss_val)
history['val_loss_val'].append(val_loss_val)
print(f'Rank {rank} finished training')
print(history)
def setup_for_distributed(is_master):
"""
This function disables printing when not in master process
"""
import builtins as __builtin__
builtin_print = __builtin__.print
def print(*args, **kwargs):
force = kwargs.pop('force', False)
if is_master or force:
builtin_print(*args, **kwargs)
__builtin__.print = print
def init_process(
rank, # rank of the process
world_size, # number of workers
fn, # function to be run
# backend='gloo',# good for single node
backend='nccl' # the best for CUDA
):
# information used for rank 0
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=world_size)
dist.barrier()
setup_for_distributed(rank == 0)
fn(rank, world_size)
if __name__ == "__main__":
world_size = 2
processes = []
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
Thanks in advance for your help!