Why do I need to use DDP when I can just use torch.distributed?

I had some trouble adapting torch.nn.parallel.DistributedDataParallel to my model. I cannot easily resolve the issue.

But then I realized, I can simply not wrap my model with DDP at all, and simply use torch.distributed instead and do

optimizer.zero_grad()
loss.backward()
for param in simulator.parameters():
    dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
    param.grad /= dist.get_world_size()
optimizer.step()

What would be the disadvantage of this? Will it be slower? It seems much simpler than trying to get DDP to work and then hoping it hooks up to all parameters gradients.

EDIT: Let me answer my own question, it’s because the Reducer in DDP is clever and collects the grads into buckets instead of doing all_reduce on a gradient at a time. I am now trying to replicate some of this cleverness myself… if anyone has ideas of how to all_gather/all_reduce multiple tensors at once…

EDIT: Here is a slightly cleverer implementation but not that much faster if at all

optimizer.zero_grad()
loss.backward()

for param in simulator.parameters():                                       
    dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)                      
    param.grad /= world_size                                               

params_to_share = []
indices = []
s = 0
for param in simulator.parameters():                                       
    params_to_share.append(torch.flatten(param.grad / world_size))         
for p in params_to_share:                                                  
    size = p.shape[0]
    indices.append((s,s+size))                                             
    s += size
flat = torch.cat(params_to_share).view(-1,1)                               
dist.all_reduce(flat, op=dist.ReduceOp.SUM)                                
params_to_share = [flat[s:e] for (s, e) in indices]                        
inx = 0
for param in simulator.parameters():                                       
    tmp = params_to_share[inx].view(param.shape)                           
    param.grad = tmp 
    inx += 1

optimizer.step()

The DDP reducer will overlap the gradient computation with gradient communication (i.e. the all-reduces), which can hide much of the communication latency.

It looks like from your code snippet that you are flattening the gradients, all-reducing the flattened gradient, and unflattening to restore the gradients. Even though this approach only uses one all-reduce, the all-reduce is not scheduled until all gradients are computed, meaning that there is no overlap between gradient computation and communication.

If you are interested in the details, feel free to take a look at the accompanying paper: https://arxiv.org/pdf/2006.15704.pdf

Thank you very much for that paper. I implemented the algorithm with my own tweaks and it works!

But now I noticed something more peculiar. Is it normal that after sharing the gradients and doing the optimzer.step(), you get a slight difference in some weight components? I guess it could be just due to floating point arithmetic. I made sure weight initialization was the same on all machines. But even just using the simple:

optimizer.zero_grad()
loss.backward()
for param in simulator.parameters():
    dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
    param.grad /= dist.get_world_size()
optimizer.step()

results in slight variation in weights across devices.

EDIT: nevermind, probably have a bug in my code

Hi Stephan, looking at your question, I have realized that I am trying to implement something similar. Is your implementation by any chance accessible?