DDP, which process is doing the all_reduce to synchronize the gradients?

Hi, I just started to learn how to do Distributed training in pytorch. And learnt from the basic tutorials from here: Getting Started with Distributed Data Parallel — PyTorch Tutorials 1.9.0+cu102 documentation and I also read the DDP paper. But I still have some questions here.

  1. If I’m spawning 2 process on 1 machine with 2 GPUs. AFAIK, in each process, there will be a model replica during DDP construction. And the all_reduce synchronization happened during loss.backward(). So I’m wondering which of the 2 process is actually doing this reduction work? I can see that during all_reduce, we need to somehow communicate between the 2 processes. But is there like a ‘major’ process doing more work? Like doing the average of the gradients?
  2. I am using GLOO currently, I just checked in Distributed communication package - torch.distributed — PyTorch 1.9.0 documentation, the GLOO on GPU has limited functionality, just 2 supported functions: broadcast and all_reduce. Are these 2 functions enough for a basic DDP example on GPU to work fine?
  3. Also, how could I check if my all_reduce is done on GPU, not on CPU? And besides, could I use GLOO backend to launch 2 processes on CPU to do DDP?
    Much appreciated, I might interpret some materials wrong, please help me out.

Edit:
I know I can use nvidia-smi to check my GPU usage to see if I have both GPU working. But the thing is, I have 2 different machines, one have 3 GPU, the other have 2 GPUs. I have set up to use only 2 processes in my simple DDP script. On the first machine, when running the script, I see that GPU memory usage is around 1.5G and 800M for the 2 GPUs by checking nvidia-smi. But on the second machine, with the same script, I can only see 42Mb GPU memory usage increments on both GPU cards. So that’s why I’m wondering if they actually run on GPUs, or loaded the work to CPUs internally when GPUs is somehow not configured correctly when I build pytorch from source.

  1. If I’m spawning 2 process on 1 machine with 2 GPUs. AFAIK, in each process, there will be a model replica during DDP construction. And the all_reduce synchronization happened during loss.backward(). So I’m wondering which of the 2 process is actually doing this reduction work? I can see that during all_reduce, we need to somehow communicate between the 2 processes. But is there like a ‘major’ process doing more work? Like doing the average of the gradients?

These are the Gloo allreduce algorithms https://github.com/facebookincubator/gloo/blob/c22a5cfba94edf8ea4f53a174d38aa0c629d070f/gloo/allreduce.h#L38

  1. I am using GLOO currently, I just checked in Distributed communication package - torch.distributed — PyTorch 1.9.0 documentation, the GLOO on GPU has limited functionality, just 2 supported functions: broadcast and all_reduce. Are these 2 functions enough for a basic DDP example on GPU to work fine?

Yes, those two functions are enough to implement a DDP algorithm. If you are doing distributed GPU training, it is recommended to use the NCCL backend.

More information on the distributed communication packages Distributed communication package - torch.distributed — PyTorch 2.1 documentation.

Also, how could I check if my all_reduce is done on GPU, not on CPU?

For clarification, do you mean that the all_reduce algorithm is run on GPU?

And besides, could I use GLOO backend to launch 2 processes on CPU to do DDP?

Yes, initialize a process group before creating your DDP model.

A tutorial on DDP Getting Started with Distributed Data Parallel — PyTorch Tutorials 2.1.1+cu121 documentation.

Thanks for quick reply.

I have indeed followed the link you metioned. And to initialize a process group, I used the code:

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

But I can’t tell where is the part related to using CPU GLOO instead of GPU GLOO in this initialization. Do you mean when creating the model, just like what we do without DDP, don’t use model.to(device) to use make the model run on CPU?
For example:

    setup(rank, world_size)

    # create model and move it to CPU? with id rank
    model = ToyModel()
    ddp_model = DDP(model)

It seems like the device_ids argument is also not needed in DDP initialization, right?

But I can’t tell where is the part related to using CPU GLOO instead of GPU GLOO in this initialization. Do you mean when creating the model, just like what we do without DDP, don’t use model.to(device) to use make the model run on CPU?

https://pytorch.org/docs/stable/distributed.html

" torch.distributed supports three built-in backends, each with different capabilities. The table below shows which functions are available for use with CPU / CUDA tensors."

You simply initialize the backend. The model is on CPU unless moved to GPU.

It seems like the device_ids argument is also not needed in DDP initialization, right?

https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html

" * device_ids (list of python:int or torch.device) –CUDA devices. 1) For single-device modules, device_ids can contain exactly one device id, which represents the only CUDA device where the input module corresponding to this process resides. Alternatively, device_ids can also be None. 2) For multi-device modules and CPU modules, device_ids must be None.When device_ids is None for both cases, both the input data for the forward pass and the actual module must be placed on the correct device. (default: None)"

1 Like

I see. Another question is regarding this piece of code:

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

The model has been designated to a GPU and also wrapped by DDP. But when we feed in data as in this line

outputs = ddp_model(torch.randn(20, 10))

Shouldn’t we use torch.randn(20, 10).to(rank) instead?

ddp will move input to device properly

So, which all_reduce algorithm does gloo use? I saw there is ring, bcube, halving_doubling, etc. In addition, does the host do the all_reduce or directly on devices?

in default, gloo is using ring algorithm gloo/allreduce.cc at master · facebookincubator/gloo · GitHub. Gloo will do the all reduce on host, but NCCL will do the all reduce on devices

1 Like

Hmm, but it seems that GLOO does support all_reduce op on CUDA tensors, right? So my question is why bother running the all_reduce on host (You have to cudaMemcpy the CUDA tensors to CPU and do the all_reduce there, and then transfer it back to GPU devices again)? Why not directly do all_reduce on GPU? It seems on GPU is faster, am I right?