[SOLVED] DDP isn't working as expected

Hi everyone,

I have been using a library to enable me to do DDP but I have found out that it was hard dealing with bugs as that library had many which slowed down my research process, so I have decided to refactor my code into pure PyTorch and build my own simple trainer for my custom pipeline.

I wanted to implement DDP to utilize multiple GPUs for training large batches. After spending some quality time, I have managed to process a working example of DDP on MNIST. The issue is after I wanted to see the difference in GPU usage when running one GPU vs. Multiple GPUs, it seems that both are utilizing ~810MB of GPU memory on Titan X GPU. I wasn’t expecting this to happen as when utilizing DDP correctly, I should be expecting GPU utilization to go down as both GPUs have smaller batch sizes to be processed.

Here is the code that I was testing on:

# credits:
# how to use DDP module with DDP sampler: https://gist.github.com/sgraaf/5b0caa3a320f28c27c12b5efeb35aa4c
# how to setup a basic DDP example from scratch: https://pytorch.org/tutorials/intermediate/dist_tuto.html
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import random

from torch.nn.parallel import DistributedDataParallel as DDP

from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
import math

def get_dataset():
    world_size = dist.get_world_size()
    train_set = datasets.MNIST('./data', train=True, download=True,
                                 transforms.Normalize((0.1307,), (0.3081,))
    val_set = datasets.MNIST('./data', train=False, download=True,
                                 transforms.Normalize((0.1307,), (0.3081,))
    train_sampler = DistributedSampler(train_set)
    val_sampler = DistributedSampler(val_set)
    batch_size = int(128 / float(world_size))
    train_loader = DataLoader(
    val_loader = DataLoader(

    return train_loader, val_loader, batch_size
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size
def reduce_dict(input_dict, average=True):
    world_size = float(dist.get_world_size())
    names, values = [], []
    for k in sorted(input_dict.keys()):
    values = torch.stack(values, dim=0)
    dist.all_reduce(values, op=dist.ReduceOp.SUM)
    if average:
        values /= world_size
    reduced_dict = {k: v for k, v in zip(names, values)}
    return reduced_dict
def train(model,train_loader,optimizer,batch_size):
    device = torch.device(f"cuda:{dist.get_rank()}")
    train_num_batches = int(math.ceil(len(train_loader.dataset) / float(batch_size)))
    # let all processes sync up before starting with a new epoch of training
    train_loss = 0.0
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss_ = {'loss': torch.tensor(loss.item()).to(device)}
        train_loss += reduce_dict(loss_)['loss'].item()
    train_loss_val = train_loss / train_num_batches
    return train_loss_val
def val(model, val_loader,batch_size):
    device = torch.device(f"cuda:{dist.get_rank()}")
    val_num_batches = int(math.ceil(len(val_loader.dataset) / float(batch_size)))
    # let all processes sync up before starting with a new epoch of training
    val_loss = 0.0
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = F.nll_loss(output, target)
            loss_ = {'loss': torch.tensor(loss.item()).to(device)}
            val_loss += reduce_dict(loss_)['loss'].item()
    val_loss_val = val_loss / val_num_batches
    return val_loss_val
def run(rank, world_size):
    device = torch.device(f"cuda:{rank}")
    train_loader, val_loader, batch_size = get_dataset()
    model = Net().to(device)
    model = DDP(model,device_ids=[rank],output_device=rank)
    optimizer = optim.SGD(model.parameters(),lr=0.01, momentum=0.5)
    history =  {
            "rank": rank,
            "train_loss_val": [],
            "train_acc_val": [],
            "val_loss_val": [],
            "val_acc_val": []
    if rank == 0:
        history = {
            "rank": rank,
            "train_loss_val": [],
            "train_acc_val": [],
            "val_loss_val": [],
            "val_acc_val": []
    for epoch in range(10):
        train_loss_val = train(model,train_loader,optimizer,batch_size)
        val_loss_val = val(model,val_loader,batch_size)
        print(f'Rank {rank} epoch {epoch}: {train_loss_val:.2f}/{val_loss_val:.2f}')
    print(f'Rank {rank} finished training')

def setup_for_distributed(is_master):
    This function disables printing when not in master process
    import builtins as __builtin__
    builtin_print = __builtin__.print

    def print(*args, **kwargs):
        force = kwargs.pop('force', False)
        if is_master or force:
            builtin_print(*args, **kwargs)

    __builtin__.print = print
def init_process(
        rank, # rank of the process
        world_size, # number of workers
        fn, # function to be run
        # backend='gloo',# good for single node
        backend='nccl' # the best for CUDA
    # information used for rank 0
    os.environ['MASTER_ADDR'] = ''
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=world_size)
    setup_for_distributed(rank == 0)
    fn(rank, world_size)

if __name__ == "__main__":
    world_size = 2
    processes = []
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, run))

    for p in processes:

Thanks in advance for your help! :smiley:

It seems that this is expected as the weights are shared through DDP. I don’t know why this wasn’t obvious to me when I was looking at memory usage.

I was testing with smaller batch sizes so I couldn’t see the impact of increasing/decreasing batch size as I am testing on MNIST, so it went through my head that about 780Mb is for the model’s weight not loading the batch into GPU.

Here is a simple example that shows the difference.

# batch_size: 1GPU vs. mem. usage on 1 of 2GPUs
# 512: 819 vs. 789
# 2048: 959 vs. 885
# 10000: 1687 vs. 1253

Thanks to @marksaroufim for his help!