Gradient Aggregation in DistributedDataParallel

Hi,

this question is somewhat in between PyTorch’s implementation of DistributedDataParallel and Paramters Server in general. My “theory” source is the Dive into Deep Learning book [1].

Figure 12.7.1 on [1] suggests the following approach:
Assume batch size 32. If we have one GPU and 128 training datapoints, each epoch has 4 mini batches. After every mini batch, we update our model (i.e. there are 4 updates). Hence, we have to calculate four gradients (one per minibatch).

For the multiple GPUs case, assume two GPUs and 128 training datapoints. We feed each GPU a mini-batch, let them calculate a gradient, sum them and update our model with the sum. Hence, there are two steps instead of four involved (from a BSP point of view).

My questions are the following:

  1. Is my described understanding on how parameter servers work correct? Especially I am not sure if we keep the same batch size 32 per GPU or if we have to divide the batch size by the number of GPUs.

  2. Why do we sum the gradients instead of averaging them? This is even more confusing to me, as in the DistributedDataParallel documentation of PyTorch (cannot post the link due to the link limit for new users), there is the following statement:

When a model is trained on M nodes with batch=N, the gradient will be M times smaller when >compared to the same model trained on a single node with batch=M*N (because the gradients >between different nodes are averaged). You should take this into consideration when you want to >obtain a mathematically equivalent training process compared to the local training counterpart.

There are two things confusing here:

2.1. It states that the gradients are averaged, which does not comply to the D2L book which states that we sum the gradients.

2.2. Until now, I always thought, in mini-batch gradient descent, the loss function (optimization goal) averages the error of the mini-batch data points. Hence, if M nodes run mini-batch gradient descent with batch size N, and we take the average of their gradients, we should receive a number that is in the same order of magnitude as if 1 node runs mini-batch gradient-descent with batch size NM, as the single node averages the error functions of nm data points for the loss function, while with M nodes we just take the average of averages.

I am not sure whether the DistributedDataParallel class of PyTorch can be seen as a parameter server (especially because there even is a guide on how to build a parameter server in PyTorch [3]), but it maps to what is described in the book as a parameter server.

Any help on resolving my confusion is much appreciated. Thank you very much!

Kind regards,
Maximilian

[1] 13.7. Parameter Servers — Dive into Deep Learning 1.0.3 documentation

[2] DistributedDataParallel — PyTorch 2.1 documentation

Hi,

I’ve had some discussion with Shen Li as one of the authors of the VLDB DDP paper via mail and we want to move our discussion here to make it accessible to everyone. Find our past communication here:

For the multiple GPUs case, assume two GPUs and 128 training datapoints. We feed each GPU a mini-batch, let them calculate a gradient, sum them and update our model with the sum. Hence, there are two steps instead of four involved (from a BSP point of view).

It depends. If you still wanna maintain batch size at 32 for each iteration, the per process batch size should schrink to (32 / world_size). But this is an application decision, and PyTorch won’t make the call for you.

Now, in the DDP documentation, one can find the following statement: When a model is trained on M nodes with batch=N, the gradient will be M times smaller when compared to the same model trained on a single node with batch=M*N (because the gradients between different nodes are averaged). You should take this into consideration when you want to obtain a mathematically equivalent training process compared to the local training counterpart.

It’s two step computation. Since AllReduce can only do sum/prod/max/min, we cannot directly use AllReduce to do mean. Hence, it uses AllReduce to sum the gradients and divide the gradient by the world_size​.

Now, in the DDP documentation, one can find the following statement: When a model is trained on M nodes with batch=N, the gradient will be M times smaller when compared to the same model trained on a single node with batch=M*N (because the gradients between different nodes are averaged). You should take this into consideration when you want to obtain a mathematically equivalent training process compared to the local training counterpart.

I believe the OSS author who added this line is removing it on master:

Because it actually depends on the loss function. Check out this:

My second question is also about the above statement from the DDP documentation. In mini-batch gradient descent, the loss function (optimization goal) averages the error of the mini-batch data points. Hence, if M nodes run mini-batch gradient descent with batch size N, and we take the average of their gradients, we should receive a number that is in the same order of magnitude as if 1 node runs mini-batch gradient-descent with batch size N*M, as the single node averages the error functions of nm data points for the loss function, while with M nodes we just take the average of averages. I do not understand why the documentation says there is a change in order of magnitude. If we would sum instead of average in the DDP module, it would make sense tho.

This is true, but depends on the loss function. Nothing prevents uses from using sum()​ as the loss function.

Lastly, I would like to ask why you do not consider the DDP module a parameter server. Isn’t it doing exactly what a PS is supposed to do?

These papers can answer that. I just list two, but many other papers also view it this way, i.e., consider AllReduce and PS as difference schemes, which makes sense to me, as one is collective communication, and another is P2P.

You do provide an tutorial on how to implement a PS with the RPC framework, but I do currently not understand why this is necessary. The tutorial states that the DDP module is only to be used for single-node multi-GPU computations, but in the sourcecode of distributed.py is stated that multi-node computations are supported as well.

Can you point me to the line that claims DDP is single-node multi-GPU? This is certainly wrong. DP is single-node multi-GPU, but DDP can run on multi-node. Also, we heavily rely on the community. If you feel anything is wrong in the code or doc. Please feel free to send in PRs to fix them.


  1. My first question is about the gradient reduction. Figure 12.7.1 on
    [1] suggests the following approach for gradient reduction:
    Assume batch size 32. If we have one GPU and 128 training datapoints,
    each epoch has 4 mini batches. After every mini batch, we update our
    model (i.e. there are 4 updates). Hence, we have to calculate four
    gradients (one per minibatch).

For the multiple GPUs case, assume two GPUs and 128 training datapoints.
We feed each GPU a mini-batch, let them calculate a gradient, sum them
and update our model with the sum. Hence, there are two steps instead of
four involved (from a BSP point of view).

Now, in the DDP documentation, one can find the following statement:
When a model is trained on M nodes with batch=N, the gradient will be M
times smaller when compared to the same model trained on a single node
with batch=M*N (because the gradients between different nodes are
averaged). You should take this into consideration when you want to
obtain a mathematically equivalent training process compared to the
local training counterpart.

The confusing part is that PyTorch averages the gradients. I do not
understand why this is the case, as [1] states we sum the gradients (and
that - in my opinion - makes sense as we want to parallelize the
gradient calculation and after parallel calculation go into the
direction of all gradients together).

  1. My second question is also about the above statement from the DDP
    documentation. In mini-batch gradient descent, the loss function
    (optimization goal) averages the error of the mini-batch data points.
    Hence, if M nodes run mini-batch gradient descent with batch size N, and
    we take the average of their gradients, we should receive a number that
    is in the same order of magnitude as if 1 node runs mini-batch
    gradient-descent with batch size N*M, as the single node averages the
    error functions of nm data points for the loss function, while with M
    nodes we just take the average of averages. I do not understand why the
    documentation says there is a change in order of magnitude. If we would
    sum instead of average in the DDP module, it would make sense tho.

  2. Lastly, I would like to ask why you do not consider the DDP module a
    parameter server. Isn’t it doing exactly what a PS is supposed to do?
    You do provide an tutorial on how to implement a PS with the RPC
    framework, but I do currently not understand why this is necessary. The
    tutorial states that the DDP module is only to be used for single-node
    multi-GPU computations, but in the sourcecode of distributed.py is
    stated that multi-node computations are supported as well.

To continue the discussion, I would like to follow up:

It seems like this PR was actually fixing the documentation for DDP: Fix inaccurate note in DistributedDataParallel by 0x4f5da2 · Pull Request #47156 · pytorch/pytorch · GitHub

Now it states:
When a model is trained on M nodes with batch=N, the
gradient will be M times smaller when compared to the same model
trained on a single node with batch=M*N if the loss is summed (NOT
averaged as usual) across instances in a batch (because the gradients
between different nodes are averaged).

I do not understand the last part in brackets. I do see (and this is also what I suggested in my initial post) that if we sum the gradients (so just AllReduce with sum), that the resulting gradient will be M times smaller. But I am a little confused on loss vs gradient summing/averaging here. What we do on each node is calculate the loss per mini-batch. This loss per mini batch is the average loss for all data points in the mini batch, see e.g. [1] (e.g. for MSE mse = \frac{1}{mini batch size} \sum_{i=1}^mini batch size (x-y)^2). This results in a gradient per mini-batch (and each node handles a mini-batch, so one gradient per node). Now the question is whether we want to sum or average the gradients of the nodes.

Hence, for me it should be:
When a model is trained on M nodes with batch=N, the
gradient will be M times larger when compared to the same model
trained on a single node with batch=M*N if the gradients are summed across instances in a batch.

Because in the distributed case, you sum M gradients. In the non-distributed case, you just have one gradient which results from the average loss.

Because what we can change is whether we want to AllReduce with a sum and than divide by the total amount of gradients (=mean) or just AllReduce with sum. I do not understand what the loss has to do with that as we deal with gradients on that level.

The second thing I do not understand is why we want to average instead of summing the gradients in the first place. According to the book in my initial post, it should be summing and for me, that’s more intuitive as well. Is there are reason why we average by default?

[1] Adventures in Machine Learning

2 Likes

This is not accurate. In the distributed case, the processes first use AllReduce to sum the gradient and then divide it by world size. So, the gradient is averaged. If you need a sum instead, you can multiple the param.grad field by world_size.

I do not understand what the loss has to do with that as we deal with gradients on that level.

This is mainly for mathematical equivalence, which can have impact on how to tune other params like LR. E.g., if you use MSE as the loss_fn in local training, then the grad you get is per-sample average. When switching to DDP in this case, you might still want grads being per-sample average, and hence it should be AllReduce sum divided by world_size. However, if you use sum() as the loss_fn in local trianing, the grad you get is per-batch sum. And when switching to DDP (per process batch size probably should be N/M in this case), you might also want to keep the grad as per-batch sum. In this case, the grad should be AllReduce sum without division.

The second thing I do not understand is why we want to average instead of summing the gradients in the first place. According to the book in my initial post, it should be summing and for me, that’s more intuitive as well. Is there are reason why we average by default?

That’s not sth that PyTorch Distributed package can decide for you. It is application’s choice.

2 Likes