Where does the execution happen in the RPC-based parameter server tutorial?

Hello,

In this nice tutorial, it is described how to implement a parameter server (PS) deployment with RPC.
What confuses me is that it is not clear where does the execution (forward and backward passes) happen in this example…is it on the trainer machines or the PS machine?
It makes sense that these computations happen on the trainers yet, the forward pass is defined as a part of the PS code, which is called remotely by the trainer! would you please clarify this part?
Also, as far as I understand, usually, the PS collects the trainers’ gradients and aggregates them (for instance by averaging) to update the global model. Does this aggregation part exist in the tutorial? if yes, would you please explain where? if no, then what’s the strategy to incorporate all trainers’ work in the global model.
A follow-up question on that last point: how can I apply more sophisticated aggregation functions on the gradients (let’s say I want to take median instead of mean on the PS machine)…how can I do this?

Thank you very much!

Hey @aguirguis

What confuses me is that it is not clear where does the execution (forward and backward passes) happen in this example…is it on the trainer machines or the PS machine?

The executions scattered on both trainer and PS:

  • Forward pass: input (on trainer) → parameter (on PS) → output (on trainer) → loss (on trainer)

So, for each trainer, there are three pieces of autograd graphs scattered on the trainer and the PS, and those are connected by RPC.

It makes sense that these computations happen on the trainers yet, the forward pass is defined as a part of the PS code, which is called remotely by the trainer! would you please clarify this part?

Yes. Both TrainerNet and ParameterServer have a forward function as they are both nn.Module subclasses. TrainerNet forward calls ParameterServer forward. An analogy would be if you have nn.Sequential(nn.Linear(10, 10)), both Sequential and Linear implement their own forward and Sequential’s forward calls Linear’s forward. This is also why the autograd graph scatters on both the trainer and the parameter server.

Also, as far as I understand, usually, the PS collects the trainers’ gradients and aggregates them (for instance by averaging) to update the global model. Does this aggregation part exist in the tutorial?

No, the aggregation is not part of the tutorial. Instead, it is doing sth similar to Hogwild! training. The aggregation method you mentioned are one type of synchronized training, where all trainers need to wait for that aggregation to finish before they can proceed. This works, and it is also possible to implement this synchronous aggregation using torch.distributed.rpc, but it won’t scale to large workload on large clusters, as any straggler will kill the performance.

if no, then what’s the strategy to incorporate all trainers’ work in the global model.

One option would be letting the PS to send parameters to trainers, and then get back gradients. Then, the PS aggregates the grads as you mentioned above, and updates its parameters. The PS can keep doing this until the model converges. This is similar to this RL tutorial where the agent tells observers what to do and the agent owns the policy model.

A follow-up question on that last point: how can I apply more sophisticated aggregation functions on the gradients (let’s say I want to take median instead of mean on the PS machine)…how can I do this?

If adopting the RL tutorial above, then the PS can collect the gradients from different trainers in a list and compute the medium accordingly.

cc the author of the PS tutorial @rvarm1

Thanks, @mrshenli for your detailed answers. I have a few follow-up questions:

Does this mean the input is sent to the PS first to propagate through the network parameters in the PS and then at the end, the output is sent back to the trainer?
What confuses me here is that it seems that the forward function of TrainerNet is just dummy and all what it does is calling that of ParameterServer. As far as I understand, in the PS architecture, data never leaves the trainer machine and that the whole gradient computation process should be done entirely locally on the trainer machine.
If you can describe all the communication that happens in one training iteration, that would be great. For instance, assume that we have one PS machine and two trainer machines. PS has the model and each trainer has a few data samples. What is sent to whom?
Hogwild! assumes shared memory so, the setup is inherently different from that of the PS, right? I cannot entirely digest how/why do you blend these two setups. Would you please clarify?

Thanks a lot.

Yes.

What confuses me here is that it seems that the forward function of TrainerNet is just dummy and all what it does is calling that of ParameterServer .

Yes, you are right. In this specific case, as x does not require grad, there is no need to link it to the distributed autograd graph. So there are only two pieces of autograd graph on PS and the trainer. (I was wrong when saying there three pieces in previous comments.)

As far as I understand, in the PS architecture, data never leaves the trainer machine and that the whole gradient computation process should be done entirely locally on the trainer machine.

There are different ways to implement this. Imagine there is a super large embedding table and the trainer only holds a several lookup indices in each iteration. One solution is to do training all on trainer, but then the application will need to implement update functions that converts indices and gradients from the trainer back to the embedding table gradients. Another option is let autograd engine taking care of this, and simply calling loss.backward() on trainer will be sufficient to update embedding table on ps.

If you can describe all the communication that happens in one training iteration, that would be great. For instance, assume that we have one PS machine and two trainer machines. PS has the model and each trainer has a few data samples. What is sent to whom?If you can describe all the communication that happens in one training iteration, that would be great. For instance, assume that we have one PS machine and two trainer machines. PS has the model and each trainer has a few data samples. What is sent to whom?

Sure. Since trainers are independent in that tutorial IIUC, I will only describe what happens between a PS-trainer pair.

In forward pass, there are two comms: 1) trainer → ps to send input sample 2) ps → trainer to send the output
In the backward pass, there is one comm: trainer → ps to send the gradients for the model outputs, which will then trigger local autograd engine on the ps to compute gradients on the model.
In the optimizer step pass, there is one comm: trainer → ps tell the local optimizer on ps to update model parameters. (It is possible to pack this into the comm during the backward pass using hooks.)

Since there are two trainers accessing the same model, instead of storing the grads in param.grad, the ps will put those grads in dedicated contexts associated with each distributed autograd context, and those grads will later be consumed by the distributed optimizer.

More details about dist autograd can be found here: Distributed Autograd Design — PyTorch 2.1 documentation

Hogwild! assumes shared memory so, the setup is inherently different from that of the PS, right? I cannot entirely digest how/why do you blend these two setups. Would you please clarify?

Right, the original paper was mainly focusing on shm. But the lock-free spirit can be apply to distributed training as well. This is especially useful for training using large dataset with large embedding tables.

1 Like

Thanks @mrshenli for your detailed answers. Now, everything is clear to me.
Probably a better design for my use case is to use local optimizer and autograd as in the RL tutorial you referred to before.

1 Like