DDP with multiple models

Hi,

I’m trying to train two models A and B on 4 GPUs, each being trained on 2 GPUs (and thus DDP is needed). The two models are independent, but they need to exchange some information during training (not gradients), hence I would like to execute a single command torch.distributed.launch so that the workers can communicate to each other. What is the correct way to do it?

Thanks in advance!

You can launch 4 processes (1 per GPU) and initialize a process group of world_size 4. You can use this process group to exchange data between A and B.

Then for each model, create a new subprocess group using Distributed communication package - torch.distributed — PyTorch 1.9.0 documentation on GPUs 0,1 and GPUs 2,3 respectively and pass that process group to DDP.

1 Like

Hi! :slight_smile: I am currently trying the same thing for a GAN.

@pritamdamania87 Can you maybe elaborate how I would pass that new process group to DDP after creating it?

I am currently trying it this way:

if "WORLD_SIZE" in os.environ:
    world_size = int(os.environ["WORLD_SIZE"])
ngpus_per_node = torch.cuda.device_count()

if world_size > 1:
    if 'SLURM_PROCID' in os.environ:  # for slurm scheduler
        rank = int(os.environ['SLURM_PROCID'])
        gpu = rank % torch.cuda.device_count()

          dist.init_process_group(backend='nccl', init_method='env://',
                                          world_size=world_size, rank=rank)
          
          model.cuda(gpu)#(gpu)
          group_model = torch.distributed.new_group(ranks=[0,1])
          model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[0,1])
          model2.cuda(gpu)
          group_model2 = torch.distributed.new_group(ranks=[2,3])
          D = torch.nn.parallel.DistributedDataParallel(model2, device_ids=[2,3])

But I am running in various kinds of errors.

You need to make these changes:

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[0,1], process_group=group_model)
D = torch.nn.parallel.DistributedDataParallel(model2, device_ids=[2,3], process_group=group_model2)
1 Like

Thank you so much! That at least helped me at some point but I am having now more troubles I am running into. So I try to run my code on an HPC Cluster with slurm. My whole code looks:

import os
import torch
import torch.nn as nn
import torch.distributed as dist


class Generator(nn.Module):

def __init__(self):
    super(Generator, self).__init__()
    self.main = nn.Sequential(
        nn.ConvTranspose2d(512, 256, (4, 4), stride=(2, 2), padding=(0, 0)),
        nn.BatchNorm2d(256),
        nn.LeakyReLU(),
        nn.ConvTranspose2d(256, 128, (5, 5), (1, 1)),
        nn.BatchNorm2d(128),
        nn.LeakyReLU(),
        nn.ConvTranspose2d(128, 64, (2, 2), (2, 2)),
        nn.BatchNorm2d(64),
        nn.LeakyReLU(),
        nn.ConvTranspose2d(64, 32, (2, 2), stride=(2, 2)),
        nn.BatchNorm2d(32),
        nn.LeakyReLU(),
        nn.ConvTranspose2d(32, 2, (1, 1), stride=(1, 1)),
        nn.BatchNorm2d(2),
        nn.LeakyReLU(),
        nn.Sigmoid()
    )

def forward(self, x):
    return self.main(x)

class Discriminator(nn.Module):
def __init__(self):
    super(Discriminator, self).__init__()
    self.main = nn.Sequential(
        nn.Conv2d(2, 32, (1,1), stride=(1,1), padding=0, padding_mode='circular'),
        nn.LayerNorm(32),
        nn.LeakyReLU(0.2),
        nn.Dropout(0.5),
        nn.Conv2d(32, 64, (5,5), stride=(2,2), padding=2, padding_mode='circular'),
        nn.LayerNorm(16),
        nn.LeakyReLU(0.2),
        nn.Conv2d(64, 128, (5,5), stride=(2,2), padding=2, padding_mode='circular'),
        nn.LayerNorm(8),
        nn.LeakyReLU(0.2),
        nn.Conv2d(128, 256, (5,5), stride=(2,2), padding=2, padding_mode='circular'),
        nn.LayerNorm(4),
        nn.LeakyReLU(0.2),
        nn.Conv2d(256, 1, (4,4), stride=(1,1), padding=0, padding_mode='circular')
    )

def forward(self, x):
    return self.main(x)



if __name__ == '__main__':
# DDP setting
if "WORLD_SIZE" in os.environ:
    world_size = int(os.environ["WORLD_SIZE"])
ngpus_per_node = torch.cuda.device_count()

if world_size > 1:
    if 'SLURM_PROCID' in os.environ:  # for slurm scheduler
        rank = int(os.environ['SLURM_PROCID'])
        gpu = rank % torch.cuda.device_count()
        print("gpu", gpu, "rank", rank, "ngpus_per_node", ngpus_per_node)
    dist.init_process_group(backend='nccl', init_method='env://',
                            world_size=world_size, rank=rank)
    group_g = torch.distributed.new_group(ranks=[0, 1])
    group_d = torch.distributed.new_group(ranks=[2, 3])

    if gpu is not None:
        torch.cuda.set_device(gpu)
        with torch.cuda.device(0):
            G = Generator().cuda()
            D = Discriminator().cuda()
        G = torch.nn.parallel.DistributedDataParallel(G, device_ids=[0,1], process_group=group_g,broadcast_buffers=True)
        D = torch.nn.parallel.DistributedDataParallel(D, device_ids=[2,3],process_group=group_d,broadcast_buffers=True)

and the error I get is this:

Traceback (most recent call last):
File “mini.py”, line 85, in
G = torch.nn.parallel.DistributedDataParallel(G, device_ids=[0,1], process_group=group_g,broadcast_buffers=True)
File “/mpcdf/soft/SLE_15/packages/skylake/pytorch/gpu-cuda-11.2/anaconda_3_2020.02-2020.02/1.8.1/lib/python3.7/site-packages/torch/nn/parallel/distributed.py”, line 446, in init
self._sync_params_and_buffers(authoritative_rank=0)
File “/mpcdf/soft/SLE_15/packages/skylake/pytorch/gpu-cuda-11.2/anaconda_3_2020.02-2020.02/1.8.1/lib/python3.7/site-packages/torch/nn/parallel/distributed.py”, line 460, in _sync_params_and_buffers
authoritative_rank)
File “/mpcdf/soft/SLE_15/packages/skylake/pytorch/gpu-cuda-11.2/anaconda_3_2020.02-2020.02/1.8.1/lib/python3.7/site-packages/torch/nn/parallel/distributed.py”, line 1156, in _distributed_broadcast_coalesced
self.process_group, tensors, buffer_size, authoritative_rank
TypeError: _broadcast_coalesced(): incompatible function arguments. The following argument types are supported:
1. (process_group: torch._C._distributed_c10d.ProcessGroup, tensors: List[at::Tensor], buffer_size: int, src: int = 0) → None

Invoked with: <object object at 0x14c74759bb00>, [tensor([[[[-4.2067e-03, -7.6654e-03, 1.2881e-02, -1.5023e-02],
[ 6.4325e-03, 1.0096e-02, 1.0314e-02, 4.2596e-03],

[[ 0.1669]]]], device=‘cuda:0’), tensor([ 0.2087, -0.3322], device=‘cuda:0’), tensor([1., 1.], device=‘cuda:0’), tensor([0., 0.], device=‘cuda:0’), tensor([0., 0.], device=‘cuda:0’), tensor([1., 1.], device=‘cuda:0’), tensor(0, device=‘cuda:0’)], 262144000, 0
srun: error: ravg1017: tasks 0-1,2,3: Exited with exit code 1
slurmstepd: error: *** STEP 1387251.0 ON ravg1017 CANCELLED AT 2022-02-24T15:14:14 ***

So I solved it by trial and error. Changing the main to the following works:

if __name__ == '__main__':
# DDP setting
   if "WORLD_SIZE" in os.environ:
        world_size = int(os.environ["WORLD_SIZE"])
    ngpus_per_node = torch.cuda.device_count()

if world_size > 1:
    if 'SLURM_PROCID' in os.environ:  # for slurm scheduler
        rank = int(os.environ['SLURM_PROCID'])
        gpu = rank % torch.cuda.device_count()
        print("gpu", gpu, "rank", rank, "ngpus_per_node", ngpus_per_node)
    torch.cuda.set_device(gpu)
    dist.init_process_group(backend='nccl', init_method='env://',
                            world_size=world_size, rank=rank)
    group_g = torch.distributed.new_group(ranks=[0,1,2,3])
    group_d = torch.distributed.new_group(ranks=[0,1,2,3])

    if gpu is not None:
        #torch.cuda.set_device(gpu)
        #with torch.cuda.device(0):
        G = Generator().cuda()
        D = Discriminator().cuda()
        G = torch.nn.parallel.DistributedDataParallel(G, device_ids=[gpu], process_group=group_g,broadcast_buffers=True)
        D = torch.nn.parallel.DistributedDataParallel(D, device_ids=[gpu], process_group=group_d,broadcast_buffers=True)

Hi @pritamdamania87, thanks for your input on this. I have tried launching several independent experiments (models A,B,C,D…) on different GPUs and they work fine.

Now in order to utilize my GPU ram better, I have tried overloading several experiments on the same GPUs (e.g. A, B with DDP on GPUs 0,1) but notice a significant slow-down during backprop and slight slow-down for forward pass for both A and B despite getting expected training results. The total spawned processes are also well within the number of CPU cores. Can I check if there is any way to optimize this for speed or is this a bottleneck on how NCCL passes messages between GPUs?

Thanks in advance!

Now in order to utilize my GPU ram better, I have tried overloading several experiments on the same GPUs (e.g. A, B with DDP on GPUs 0,1)

I wouldn’t recommend doing this unless you somehow ensure only 1 DDP instance is utilizing the GPUs at a time. Running multiple processes using the same GPUs can result in deadlocks.