RPC with raw irecv, reduce, ... distributed primitives

Is it possible to use torch.distributed.rpc (since 1.4.0) with irecv, reduce, broadcast… primitives together? In my experience, it is possible to use send&recv primitives after init_rpc() call, but I am not sure whether a blocking recv / reduce / all_gather call will interfere with rpc api.

If you have any experience with this, please let me know, thank you!

Yes, this is possible, but there is one caveat. The current implementation of init_rpc() sets the default process group. It actually shouldn’t do that, and we are fixing it (see this issue). With the current RPC implementation, I see two ways to implement this feature.

Option 1

First, call init_rpc() and let it set the default process group. Then use the new_group API to create new set of process group instances, and only call irecv/reduce/broadcast on the new process group instances. This would avoid messing up states of the default process group that RPC is running on.

Option 2

Directly create process group using its constructor. See the test below as an example:

Just curious, do you mind share more details of your use case? What’s the motivation to combine RPC with collective communications? I saw some requirement for this to support combining RPC with DDP. Is this also your use case?

Thank you for your response! Yeah, I have noticed that issue on github. I am also kind of expecting using some solution similar to your option 1, and now reassured by your answer, I am ready to use it.

The reason why I would like to use RPC with collective communications together is that while RPC mechanism is great for implementing point to point communication paradigm, it is not very handy when it comes to implementing algorithms where serveral processes are closely correlated and doing exactly the same things in parallel, that is the job for collective comm primitives. And it is very hard to implement on-demand calling using collective comm primitives as well.

An example of a part of my actual application is a distributed buffer used in IMPALA and APEX, where workers are continuously pulling models from trainers and doing rollouts. The topology of workers and trainers are preset and given by user, active service discovery is not considered. “Lost of a single worker does not affect any other peers” is an optional, but important feature.

I have one more question, how do you handle errors like a worker disconnects from the group? As far as I’m concerned, there is just a timeout argument in the rpc api and no explicit error handling descriptions for rpc or collective in the document. Maybe ZeroMQ is a better alternative if we are concerned about robustness?

Thanks for sharing more details.

I have one more question, how do you handle errors like a worker disconnects from the group?

We are working on elasticity support to allow workers to dynamically join and leave.

As far as I’m concerned, there is just a timeout argument in the rpc api and no explicit error handling descriptions for rpc or collective in the document.

For timeout, there are per-RPC timeout on master branch for rpc_sync and rpc_async. For timeout in remote, the following PR is add it.

no explicit error handling descriptions for rpc or collective in the document

Right, currently, there is only a wait API on the returned Future object. Currently, you will need to try-except the wait. In the C++ implementation, there are error handling APIs on the Future, let us expose it to Python as well. Besides this, do you also need things like error listener for errors occurs in background RPC threads or something else?

Maybe ZeroMQ is a better alternative if we are concerned about robustness?

It depends on what do you need in the application. If you just need P2P communication without tensor/autograd support, then any matured RPC-like system can do. If you need autograd, or would like to do direct device-2-device copy (coming to v1.6), or would like to integrate with TorchScript to speed up training, then torch.distributed.rpc might be better. The latter is a relatively new project and there are still many gaps in the features, but we’d love to drive it together with the community into a better shape.

Thank you for your detailed reply, currently, my project does not need more error in-depth error handling apart from try-excepting the wait, after a thorough consideration, I think rpc and collective comm are sufficient for the most necessary functions, for now. Whether using some more mature RPC frameworks really depends on my users requests, and I think it is unecessary at this moment.

Looking forward to more updates on elasticity enhancement. :laughing:

1 Like

According to the definition of function _invoke_remote_builtin and _invoke_remote_python_udf
in pytorch/torch/csrc/distributed/rpc/init.cpp, L557-591:

     module.def(
      "_invoke_rpc_builtin",
      [](const WorkerInfo& dst,
         const std::string& opName,
         const float rpcTimeoutSeconds,
         const py::args& args,
         const py::kwargs& kwargs) {
        return std::make_shared<jit::PythonFutureWrapper>(
            pyRpcBuiltin(dst, opName, args, kwargs, rpcTimeoutSeconds));
      },
      py::call_guard<py::gil_scoped_acquire>());


, which are called under the hood by rpc.remote, rpc.rpc_async, rpc.rpc_sync, is it correct to say that the rf parameter in _invoke_rpc:

def _invoke_rpc(to, func, rpc_type, args=None, kwargs=None):
    if not callable(func):
        raise TypeError("function should be callable.")

    qualified_name = torch.jit._find_builtin(func)
    dst_worker_info = _to_worker_info(to)
    # If profiling is enabled, kick off the timer and retrieve back a
    # RecordFunction instance.
    rf = None
    ...

defines timeout in seconds for each individual rpc call?

Sorry I mixed up the python api code in 1.5.0 release and current master branch code. Now master branch provides timeout argument in python api:

@_require_initialized
def rpc_sync(to, func, args=None, kwargs=None, timeout=UNSET_RPC_TIMEOUT):

Yes, timeout arg is now available in rpc_sync and rpc_async APIs on master. Support for remote API will come soon.