Concurrency concerns on the example of parameter server using RPC

Dear @mrshenli,

I have noticed that your team/colleague released a new tutorial on the parameter server using the RPC framework (rpc_param_server_tutorial). I really appreciate the example with detailed and helpful explanations, and it seems to me that it can work with multiple trainers accessing to the same parameter server. I think the code below makes sure there is only one parameter server can be created by the trainers.

# The global parameter server instance.
param_server = None
# A lock to ensure we only have one parameter server.
global_lock = Lock()


def get_parameter_server(num_gpus=0):
    """
    Returns a singleton parameter server to all trainer processes
    """
    global param_server
    # Ensure that we get only one handle to the ParameterServer.
    with global_lock:
        if not param_server:
            # construct it once
            param_server = ParameterServer(num_gpus=num_gpus)
        return param_server

def run_parameter_server(rank, world_size):
    # The parameter server just acts as a host for the model and responds to
    # requests from trainers.
    # rpc.shutdown() will wait for all workers to complete by default, which
    # in this case means that the parameter server will wait for all trainers
    # to complete, and then exit.
    print("PS master initializing RPC")
    rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size)
    print("RPC initialized! Running parameter server...")
    rpc.shutdown()
    print("RPC shutdown on parameter server.")

However, when it comes to Distributed Autograd, forward, and back passes using the training loop below:

def run_training_loop(rank, num_gpus, train_loader, test_loader):
...
    for i, (data, target) in enumerate(train_loader):
        with dist_autograd.context() as cid:
            model_output = net(data)
            target = target.to(model_output.device)
            loss = F.nll_loss(model_output, target)
            if i % 5 == 0:
                print(f"Rank {rank} training batch {i} loss {loss.item()}")
            dist_autograd.backward(cid, [loss])
            # Ensure that dist autograd ran successfully and gradients were
            # returned.
            assert remote_method(
                ParameterServer.get_dist_gradients,
                net.param_server_rref,
                cid) != {}
            opt.step(cid)

     print("Training complete!")
     print("Getting accuracy....")
     get_accuracy(test_loader, net)

How would I make sure there are no concurrency issues? For example, if you have two trainers and have a situation where one trainer is doing the forward propagation and the other is doing the backward pass, how to make sure the two processes are not conflicting with each other?

Thanks,

Hey @ZiyiZhu

That’s a great question! I have two comments on this concurrent param updating approach:

  1. Each trainer has its dedicated gradients on the parameter server. When using distributed autograd, the computed gradients are stored in the autograd context (instead of in param.grad), which can be identified by the unique cid. So there is no concern on race errors for gradient computation.
  2. It is true that there can be multiple trainers updating the same parameter concurrently, and it is true that the parameter might change after a trainer computes the gradients and before it applies the gradients to the parameter. This idea is partially borrowed from the Hogwild! paper. And the approach of using “not perfectly up-to-date gradients” can also be found in other projects (e.g., PipeDream).

In general, this is a trade-off between model accuracy and training speed. And in practice, we saw several use cases where this helped to accelerate training a lot with little or none accuracy penalty.

cc @rvarm1

Dear @mrshenli,

Thank you very much for your explanations with suggested paper references. I will follow up if I still have more questions on the RPC when I finish reading the papers.

Best,
Ziyi

Dear @mrshenli,

I have briefly gone through the PipeDream paper. Now I understand better how this parameter RPC example can be implicitly pipelined which speeds up the training phase for model parallelism. However, I still have several questions below and hope you could help to answer:

  1. This rpc_parameter_example seems to me not a strict parameter server strategy for data parallelism training. Multiple trainers can update the “parameter server” however that is done separately by the distributed Autograd which means there is no averaging for the gradients of each trainer, IIUC? Is a typical parameter server strategy looking more like
    image
    from Scaling Distributed paper instead? If I want to do this parameter server strategy (averaging the gradients of each trainer ) for data-parallel , is it possible to do that with RPC?

  2. I remember last time you mentioned that by default the Distributed Data Parallel (DDP) uses ring allreduce for averaging the gradients. Is ring allreduce better than parameter server all the time? But can the DDP use parameter server strategy instead?

  3. In the PipeDream paper, the workload can be fine-grained and controlled as below:
    image
    Would RPC be able to do the same? Or what would be the order/priority for the RPC to choose running which job (1,2,3,4 forward or 1,2,3,4 backward) on the GPU?

Thank you very much!
Best,

  1. I was referring to this part of the PipeDream paper:

Each backward pass in a stage results
in weight updates; the next forward pass uses the latest version
of weights available, and “stashes" a copy of these weights to use
during the corresponding backward pass. Although the forward
pass will not see updates from incomplete in-flight mini-batches,
learning is still effective because model weights change relatively
slowly and bounded staleness has been found effective in improving training speeds

And the Hogwild paper linked above also mentioned sth with similar no-lock spirit:

In this work, we propose a simple strategy for eliminating the overhead associated with locking:
run SGD in parallel without locks, a strategy that we call Hogwild!. In Hogwild!, processors are
allowed equal access to shared memory and are able to update individual components of memory
at will. Such a lock-free scheme might appear doomed to fail as processors could overwrite each
other’s progress. However, when the data access is sparse, meaning that individual SGD steps only
modify a small part of the decision variable, we show that memory overwrites are rare and that
they introduce barely any error into the computation when they do occur. We demonstrate both
theoretically and experimentally a near linear speedup with the number of processors on commonly
occurring sparse learning problems.

In general, if lock is necessary (e.g., due to unacceptable accuracy drop), applications can do so by explicitly acquiring locks, but this will certainly have impact on training speed. So it is up to the application to decide how to play with the trade-off.

Is ring allreduce better than parameter server all the time?

No, the merit of allreduce is that it can (actually depend on the loss function) achieve mathematical equivalence with local training. But whether synchronous training is better than asynchronous training is an open question. DistributedDataParallel in PyTorch is using allreduce, but there are also that types of DDP, e.g., SlowMo is using gossip.

But can the DDP use parameter server strategy instead?

Yes, it certainly can. The holgwild paper linked above can be one example of using parameter-server-based data parallel on the “decision variable”, e.g., an embedding table.

Currently the RPC package does not support priority-based communication/execution yet, but this might be doable from the application side. The callee can maintain priority queues to order incoming tasks and use an RPC argument to indicate the priority. The WIP async user function will help to reduce the callee-side overhead for this use case.

1 Like

Hi @mrshenli,

Thank you very much again for the provided explanations and references. I will look into them and hopefully have efficient implementations of DDP and RPC in Pytorch.

Best,
Ziyi

Dear @mrshenli,

Following up on the Hogwild! paper mentioned previously. I also found out that Pytorch has the PytorchHogwild example using multiprocessing techniques.

I would want to redo this and then extend it for multi-machine and distributed training, either using existing Pytorch DDP or custom design built upon distributed communication APIs.

The big picture is shown below:

Within the Machine#0 !Hogwild can be performed. Does this make sense to you?

However, when I am making a simple example to test the torch.multiprocessing first locally, it seems that multiprocessing cannot work. Please see the result below:

import os
import torch
from torch import nn
import torch.distributed as dist
import torch.multiprocessing as mp
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.__version__)
>> 1.4.0
class Net(nn.Module):
    def __init__(self):
        super().__init__() 
        self.out = nn.Linear(in_features=5, out_features=1)

    def forward(self, t):
        # output
        t = self.out(t)
        return t
def addNet(model):
    for para in model.parameters():
        tmp = torch.ones_like(para.data)
        para.data = para.data + tmp
        print(para.data)
torch.manual_seed(101)
model = Net()
model.share_memory()
for para in model.parameters():
    print(para.data)
>>> tensor([[-0.2701, -0.0445, -0.3659,  0.3463, -0.1884]])
>>> tensor([-0.4306])
num_processes = 2
processes = []
for rank in range(num_processes):
    p = mp.Process(target=addNet, args=(model,))
    p.start()
    processes.append(p)
for p in processes:
    p.join()
>>> tensor([[0.7299, 0.9555, 0.6341, 1.3463, 0.8116]])
>>> tensor([[0.7299, 0.9555, 0.6341, 1.3463, 0.8116]])
>>> tensor([0.5694])
>>> tensor([0.5694])
for para in model.parameters():
    print(para.data)
>>> tensor([[-0.2701, -0.0445, -0.3659,  0.3463, -0.1884]])
>>> tensor([-0.4306])

It seems to me that the children processes just made a copy of the NN in the parent process and the NN was not being shared among the parent and two children processes. Did I miss anything from PytorchHogwild? Or it will be very much appreciated if you could share any thoughts.

Best,
Ziyi

Hey @ZiyiZhu did you use the “spawn” mode? If you run the example as is, does it work?

Hi @mrshenli

It seems not. Even the addNet is not printing anything.

I sometimes run into weird errors when using multiprocessing in notebook. Does it work if you directly launch the script from command line? Let me try that locally.

Oh I see, you need to use the inplace add_, sth like:

import os
import torch
from torch import nn
import torch.distributed as dist
import torch.multiprocessing as mp
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.__version__)

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.out = nn.Linear(in_features=5, out_features=1)

    def forward(self, t):
        # output
        t = self.out(t)
        return t


def addNet(model):
    for para in model.parameters():
        tmp = torch.ones_like(para.data)
        with torch.no_grad():
            para.add_(tmp)
            print(para.data)

if __name__=="__main__":
    mp.set_start_method('spawn')

    torch.manual_seed(101)
    model = Net()
    model.share_memory()
    for para in model.parameters():
        print(para.data)

    num_processes = 2
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=addNet, args=(model, ))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()


    for para in model.parameters():
        print(para.data)

Hi @mrshenli,

Thank you very much for running the code on your end. Yes, I tested and it can work by launching the script from command line. However, the Jupter notebook cannot work, which is interesting.

Going back to the first question, do you see any potential problems with this?

Thank you!

Best,
Ziyi

Hii @ZiyiZhu, I think the code would run, and it’s like using multiple NCCL allreduce in a HogWild manner, but I am not confident on the correctness on DDP in this use case. The allreduce is not an atomic operation, it needs to go through the ring (say we are using ring allreduce here) twice to collect and propagate the values. So, it is possible that, after allreduce, model parameters on different processes in the same group are no longer in sync. This breaks DDP’s assumption, and this gap could get larger over more iterations. You might need some extra code to re-sync DDP model parameters every n iterations.

I guess we can only find out by training some real models with this scheme. Async training is full of surprises.

1 Like

I see, and thank you very much for the insights. I would try to re-sync after some iterations or maybe use other schemes such as ps-worker instead.

Best,