About distributed autograd and optimizer

What’s the best practice of using distributed autograd and optimizer?
Is it designed to apply the master-slave paradigm, where a master distributes task to slaves and collect results, then performes gradient optimization.
Or is it designed to apply a client-host paradigm, where clients actively push results (such as gradients) to the host and host performs post processing like reduction?

I have read your documentation and code in examples. Personally I think it is the first one, though I am not sure what your future plans are, I am currently laying more application layers on rpc and I would like to know your design ideas.

The second question: since your implementation of rpc is using the FAST & SMART mode algorithm, I have also read the detailed PR on your github. I think your workers are stateful, so what will happen if during an execution of the rpc context, some worker fail? like the below scenario in your rpc parameter server example:

       with dist_autograd.context() as cid:
            model_output = net(data)
            target = target.to(model_output.device)
            loss = F.nll_loss(model_output, target)
            if i % 5 == 0:
                print(f"Rank {rank} training batch {i} loss {loss.item()}")
            dist_autograd.backward(cid, [loss])
            # Ensure that dist autograd ran successfully and gradients were
            # returned.
            assert remote_method(
                ParameterServer.get_dist_gradients,
                net.param_server_rref,
                cid) != {}
            opt.step(cid)

what kind of exeptions will be thrown on the if the failure occurs in forward & backward pass?

Hey @iffiX, thanks for the question.

They are designed in the way that all RPC workers are peers, such that each RPC worker has its own server running in background, and any RPC worker A can talk to any other RPC worker B by sending a function to run on B. In the foreseeable future, we are not going to change this design.

We do see many applications using RPC to build master-slave applications, where one worker serves as a master telling everyone else what to do. One example is this tutorial where the agent is serving as the master, and commanding all observers.

It is also possible to build server-client applications using the RPC API. Here is an example: https://github.com/pytorch/examples/pull/702/files

In general, we try to make the RPC API as flexible as possible, and hope it can support a wide range of applications. It’s up to the application developers to decide how to decompose the entire logic into multiple functions and how to stitch those functions together using RPC.

Regarding distributed autograd and distributed optimizer, the worker that creates the distributed autograd context serves as the driver for its backward pass and optimizer step. But there can be multiple drivers in a cluster. One example would be this tutorial, where we launch one Parameter-Server process that passively sits there waiting for requests from trainers and we also launch multiple trainer processes with each trainer actively running a training loop. In this case, every trainer serves as a driver of its own backward pass and optimizer step.

1 Like

This is true, they are stateful, and states including RRef context, distributed autograd context and application states. In the current version, an RPC gang cannot survive any node crash. We are actively collaborating with torchelastic to provide fault-tolerance and elasticity support.

what kind of exeptions will be thrown on the if the failure occurs in forward & backward pass?

If the error is recoverable, e.g., just an exception instead of hard node crash, RPC should throw the same exception (or a RuntimeError, this is not great, still working on improvements) on the caller. Below are some example tests:

1 Like

Thank you mrshenli, you are so helpful!

About my question:
Well the first question is actually about the autograd based on rpc, and not rpc apis themselves, but on hindsight I think it is not really a good question, so lets set it aside. The second answer is sufficient. I think I cannot use the distributed autograd now as they are a little bit too brittle.

About distributed programming:
Indeed, I have noticed your amazing work on torchelastic, I have thought about your rpc design, and find that it could be very hard for you to implement a mechanism which rejoins an rpc worker after their crash (whether a hard node crash or a soft crash like an exception), since you are blocking all processes on init and exchange their connection information, it would be overly complex to try and recover these information on crash.

Therefore, I instead worked out another solution to provide some robustness for raw rpc apis, since under some conditions the “map-reduce” (very similar so I will use this description, since workers will receive their work portion when they reach the start barrier) paradigm in torchelastic Rendezvous is not very handy, and rpc is just right.

My solution basically implements a stable distributed election process & perfect failure detector based on rpc, and let each worker in the rpc group takes one/many role(s), when a worker failes (hard/soft), its role will be reassigned to other workers and this worker is marked as tainted permanently. Users only needs to declare the number of total workers on start and specify what a role have to do when they are: initialized, running, and stopped. Adds a little bit overhead on start & worker failure, but near zero overhead during a normal rpc call.

My future plan is to integrate my framework with NNI tuner developed by the microsoft team, so I can have some taste of distributed RL training on kubernetes.

That’s all for now, thank you again! :smiley:

1 Like

The rendezvous in RPC init will be gone in future releases (not sure which release yet :slight_smile: ). We are thinking about using c10d::Store + listeners to support dynamic worker join/leave.

My solution basically implements a stable distributed election process & perfect failure detector based on rpc, and let each worker in the rpc group takes one/many role(s), when a worker failes (hard/soft), its role will be reassigned to other workers and this worker is marked as tainted permanently. Users only needs to declare the number of total workers on start and specify what a role have to do when they are: initialized, running, and stopped. Adds a little bit overhead on start & worker failure, but near zero overhead during a normal rpc call.

This is awesome! Are these in an open-source repo? It will be great if we could have the opportunity to learn more about it. cc @Kiuk_Chung

1 Like

Dynamic worker join/leave is great! With that we can just discard failed workers and start new ones, looking forward to that.

I am still writing docs for my framework, and haven’t begin the unit test & CI/CD process yet :rofl: :rofl:

Currently my framework aims more at RL things, so tests for RL algortihms will come first, distributed part is mainly aimed at resource hungry algorithms such as DQN-APEX, DDPG-APEX, IMPALA etc., and multi agent scenes. I really hope to complete code, doc and test for major RL algorithms part before July, then completes distributed scenes before August / September so you will have to wait a l-i-t-t-le more. :rofl: :rofl:

They will be in a new github repo, and I will post updates as soon as possible.

1 Like

Great! Looking forward to that!