How to catch exceptions caused by rpc exactly

@mrshenli How to catch exceptions rpc.async, rpc.sync and rpc.remote thrown in the caller under the following conditions, suppose a timeout is set globally (or per call):

  1. during execution, the target process crashes and exits, also closing down all rpc execution threads.
  2. during execution, connection to the target process is closed
  3. during execution, the timeout limit is reached
  4. during execution, an exception is raised in the executed function

Based on my experiments, my partial answer is:

  1. Not known ?
  2. A RuntimeError, something like “peer reset”
  3. An uncatchable std::runtime_error, something like:
terminate called after throwing an instance of 'std::runtime_error'
  what():  RPC ran for more than 5000 milliseconds and timed out.
  1. the exception thrown by the function, not the original exception, but wrapped in a udf exception and reraised on the caller side.

The third one troubles me the most because std::runtime_error will cause an ugly Fatal Python Error:

Fatal Python error: Aborted

Thread 0x00007f916abab700 (most recent call first):
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/world.py", line 63 in _rpc_call_remote_method
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/torch/distributed/rpc/internal.py", line 153 in _run_function

Thread 0x00007f91693a8700 (most recent call first):
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/world.py", line 75 in _rpc_get_remote_paired_value
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/torch/distributed/rpc/internal.py", line 153 in _run_function

Thread 0x00007f9163fff700 (most recent call first):
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/world.py", line 75 in _rpc_get_remote_paired_value
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/torch/distributed/rpc/internal.py", line 153 in _run_function

Thread 0x00007f91527fc700 (most recent call first):
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/torch/distributed/rpc/api.py", line 554 in rpc_sync
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/torch/distributed/rpc/api.py", line 77 in wrapper
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/world.py", line 756 in _rpc_paired_class_call
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/world.py", line 597 in rpc_paired_class_sync
  File "/home/Administrator/iffi/Projects/machin/test/parallel/distributed/test_world.py", line 97 in main
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/world.py", line 46 in _exec_role
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/thread.py", line 47 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f9152ffd700 (most recent call first):
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/election.py", line 423 in _task_timeout
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/thread.py", line 47 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f91537fe700 (most recent call first):
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/election.py", line 435 in _task_keep_alive
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/thread.py", line 47 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f9153fff700 (most recent call first):
  File "/usr/lib/python3.5/threading.py", line 297 in wait
  File "/usr/lib/python3.5/queue.py", line 173 in get
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/election.py", line 491 in _task_handle
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/thread.py", line 47 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f9160ff9700 (most recent call first):
  File "/usr/lib/python3.5/threading.py", line 293 in wait
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/event.py", line 66 in wait
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/role_dispatcher.py", line 234 in _task_dispatch
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/thread.py", line 47 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f91617fa700 (most recent call first):
  File "/usr/lib/python3.5/threading.py", line 293 in wait
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/event.py", line 66 in wait
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/distributed/world.py", line 302 in _task_run_dispatched_roles
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/thread.py", line 47 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f91e4362700 (most recent call first):
  File "/home/Administrator/iffi/Projects/machin/test/parallel/distributed/test_world.py", line 145 in subproc_start_world_with_roles
  File "/home/Administrator/iffi/Projects/machin/test/parallel/util_run_multi.py", line 16 in process_main
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93 in run
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/process.py", line 52 in run
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249 in _bootstrap
  File "/usr/lib/python3.5/multiprocessing/popen_fork.py", line 74 in _launch
  File "/usr/lib/python3.5/multiprocessing/popen_fork.py", line 20 in __init__
  File "/usr/lib/python3.5/multiprocessing/context.py", line 267 in _Popen
  File "/home/Administrator/iffi/Projects/machin/machin/parallel/process.py", line 25 in _Popen
  File "/usr/lib/python3.5/multiprocessing/process.py", line 105 in start
  File "/home/Administrator/iffi/Projects/machin/test/parallel/util_run_multi.py", line 27 in processes
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 788 in call_fixture_func
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 964 in pytest_fixture_setup
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 87 in <lambda>
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 914 in execute
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 584 in _compute_fixture_value
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 503 in _get_active_fixturedef
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 487 in getfixturevalue
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 477 in _fillfixtures
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/fixtures.py", line 297 in fillfixtures
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/python.py", line 1483 in setup
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 373 in prepare
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 123 in pytest_runtest_setup
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 87 in <lambda>
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 217 in <lambda>
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 244 in from_call
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 217 in call_runtest_hook
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 186 in call_and_report
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 94 in runtestprotocol
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/runner.py", line 85 in pytest_runtest_protocol
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 87 in <lambda>
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/main.py", line 272 in pytest_runtestloop
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 87 in <lambda>
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/main.py", line 247 in _main
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/main.py", line 191 in wrap_session
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/main.py", line 240 in pytest_cmdline_main
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/callers.py", line 187 in _multicall
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 87 in <lambda>
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/manager.py", line 93 in _hookexec
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/pluggy/hooks.py", line 286 in __call__
  File "/home/Administrator/iffi/Projects/machin/venv/lib/python3.5/site-packages/_pytest/config/__init__.py", line 125 in main
  File "/data/software/pycharm/pycharm-2020.1.2/plugins/python/helpers/pycharm/_jb_pytest_runner.py", line 43 in <module>

Is there any clean way to deal with the first three conditions? The fourth one is simple. And why pybind11 is not converting the third std::runtime_error to a catchable python RuntimeError ?

Hey @iffiX

  1. during execution, the target process crashes and exits, also closing down all rpc execution threads.
  2. during execution, connection to the target process is closed

Prior to v1.6, ProcessGroup is the only available backend, which requires all processes to be alive. So RPC gang cannot survive these failures. Even if you can catch it in application code, it will leave subsequent RPC behaviors in an undefined state, unless there is a global recovery process (we do have plans to providing this).

We will introduce TensorPipe backend for RPC in v1.6, which is a P2P comm library. But in the first experimental version, it still has some part depends on ProcessGroup, so I think it still wouldn’t tolerate such failures in v1.6. There is also an ongoing project to provide elasticity to RPC.

cc @lcw @agolynski

  1. during execution, the timeout limit is reached

This should thrown a RuntimeError for the timeout. And try-except on RuntimeError type should be able to catch it. See the code below:

If this is not so in your application, then it is a bug that we need to fix. Please let us know how we can reproduce it. Or if you have other suggestions on how we should report timeout error, please also let us know your suggestions.

cc @rvarm1

  1. during execution, an exception is raised in the executed function

If there is an exception during remote execution, this should be thrown on the caller side. See the code below:

This is how the Python error is captured:

This is how non-user error is captured:

Please let us know if we missed any. Thanks!

1 Like

BTW, which version of PyTorch are you using currently?

My local development version is 1.5.0, my test version is your latest docker build.

:pensive: It’s really sad to hear that currently pytorch rpc cannot handle the 1st and 2nd condition, since that’s what my application code is designed to do. I will try to repoduce the 3rd condition with simpler code, but that might be very difficult since currently there is no way to log all events just before the “fatal abort” happens.

Anyway, thanks for your response! I still want to ask: what kind of P2P mechanism are you going to provide in the TensorPipe backend? could you please clarify your and your collegues’ plans a little bit more?

Could you please elaborate more on the requirement? We understand it is important to provide failure-recovery + elasticity, but haven’t seen people explicitly requesting this yet, so that didn’t get into our top-priority in past releases. If this is a commonly required feature that might block many use cases, we will reprioritize work items and try to get this done sooner.

Anyway, thanks for your response! I still want to ask: what kind of P2P mechanism are you going to provide in the TensorPipe backend? could you please clarify your and your collegues’ plans a little bit more?

This will be released in v1.6 very soon. TensorPipe no longer requires rendezvous or full participation from all processes. So technically, crashed processes do not prevent the rest processes to function correctly (not yet so in v1.6). And it should also be faster than ProcessGroup-based RPC backend. Eventually, we will retire the ProcessGroup-based RPC backend, and make TensorPipe RPC backend as the default option.

cc the main contributor of TensorPipe @lcw :slight_smile:

Actually its just an experimental failure tolerance feature, when I implemented feature, I was expecting your rpc layer to be similar to a normal unreliable connection, that is, a lossy link between 2 processes will not affect any other processes, if it fail, it just fail silently between these 2 processes only, and throws a detectable error. It doesn’t have to be recoverable. Elasticity is also not required, currently it only deals with a preset number of processes, suppose you have 100 work “roles”, you may run it with 1 process or 200 processes, the system is fully distributed and can tollerate losses, it will reschedule the failed role to other healthy processes.

It’s not an imporatant core feature, although I spent quite some time to implement and test it. I have removed it from my framework today, for now :rofl:

I have read descriptions of the TensorPipe, its great to have a smart backend which can choose the best way to move data around transprently, looking forward to it. :blush:

Sorry about the inconvenience!

I see. In this case, looks like we just need to:

  1. remove the ProcessGroup usage from TensorPipe RPC agent
  2. let TensorPipe throw proper errors.

@lcw did I miss anything?

If that is the case, I can try to restructure my code and submit this feature as a pull request to your distributed module, if you think this is valuable for your rpc module.

The feature is structured in the following way:

rpc layer <-> election layer -> role dispatcher layer <-> role runner <-> wrapped highlevel apis

High level apis works like:

WORLD                               (only initialize once, tell it what roles you want to run)
\-> create_collective_group(...)    (create a sub group which supports irecv, isend, all_gather etc.)
\-> create_rpc_group(name, roles)   (create a rpc group)
\-> get_rpc_group(name, role)       (find a group handle created by a local role or remote role)

CollectiveGroup
\-> send
\-> recv
\-> isend
\-> irecv
...
\-> barrier

RpcGroup
\-> rpc_pair(key, value)           (pair a value to the group, so that it can be accessed, locally or remotely)
\-> rpc_get_paired(key)            (get a paired value in this group)
\-> rpc_sync
\-> rpc_async
\-> rpc_remote
\-> rpc_paired_class_sync          (invoke a method on the paired class instance)
\-> rpc_paired_class_async
\-> rpc_paired_class_remote
\-> rpc_paired_model_sync          (perform a forward operation on the registered model)
\-> rpc_paired_model_async
\-> rpc_paired_model_remote   

Users may start a role, register a service on this role, and access this service from another role like:

class WorkerService(object):
    # An example stateful service class
    _count = 0

    def counter(self):    
        self._count += 1
        return self._count


class Worker(RoleBase):
    def __init__(self, index):
        super(Worker, self).__init__(index)
        self.service = WorkerService()
        self.group = get_world().create_rpc_group("Employees", roles=[
            "Worker:0", "Worker:1", "Worker:2"
        ])
        # expose service
        self.group.rpc_pair("worker_service", self.service)

    def main(self):
        while True:
            pass

class Manager(RoleBase):
    def __init__(self, index):
        super(Manager, self).__init__(index)
        self.group = get_world().get_rpc_group("Employee", "Worker:{}".format(index))

    def main(self):
       for i in range(10):
            self.group.rpc_paired_class_sync(
                to="Worker:{}".format(self.role_index),
                cls_method=WorkerService.counter,
                name="worker_service",
            )

if __name__ == "__main__":
    # suppose process rank is `rank`
    # suppose there are 3 processes
    # start 3 managers and 3 workers
    world = World(world_size=3, rank=rank, 
                  roles={"Worker": (Worker, 3),
                         "Manager": (Manager, 3)},
                  rpc_timeout=0.5, election_timeout=0.3, logging=True)

What do you think about this design?

Thanks a lot for this proposal!!

There was a proposal for the role-based RPC design with some similar concepts from @Kiuk_Chung, we might be able to join forces there.

@iffiX Regarding the PR submission, we are grateful about the intent for contributing! Before making decisions on the commitment, may I ask how much bandwidth will you be able to allocate to this project? Since this can be a major new feature, most likely we will need to go through the formal design proposal review --> API review --> code review process and will also need examples (in pytorch/examples repo) and tutorials (in pytorch/tutorials repo) for it. This can become a very involved effort, so that we will need to make sure we have all the things we need to hit the finishing line.

1 Like

Hmm, I can do that, I will finish testing the rest of my framework first, might take a week or so, then pour in major effort to complete this, currently I am studing & working at home full time. I am really proud to hear some affirmation!

So is there any document for the whole process of the proposal? It would be much easier if there is any template to refer to.

1 Like

BTW, I would also like to see the proposal of your role-based RPC design from Kiuk_Chung :blush:

Awesome!! And yes, let’s wait for comments from @Kiuk_Chung

If this looks OK, we can then start from publishing an RFC design issue. Here are some examples:




This will draw people into the discussion.

@iffiX, @mrshenli I’ll publish the torch.distributed.app proposal that we’ve reviewed internally as a RFC on github shortly - just need to format it in markdown style.

2 Likes

@iffiX – here it is: https://github.com/pytorch/pytorch/issues/41425. Looking forward to collaborating on this!

2 Likes

@iffiX A lot of interesting stuff here, let me get to it in order.

Your points #1 (failure of a node) and #2 (failure of a link) are indeed handled poorly by the ProcessGroup backend as they bring all other nodes and links down with them. The TensorPipe backend is more resilient and only fails the affected requests. At this stage, a failed link will remain failed forever (although we’re discussing how to improve this in https://github.com/pytorch/pytorch/issues/40936, please chime in there). A failed node is harder to deal with, as nodes can be stateful and thus restarting it doesn’t mean that the other nodes can resume talking to it as if nothing had happened. The current approach chosen by Elastic is to restart all nodes when one fails. That’s drastic but safe. Also, even in the TensorPipe backend, we currently have nodes exchange their addresses at the beginning, so if a node (re-)joins later it won’t be able to make its address known to the other ones.

The other point where the TensorPipe backend performs a collective operation among all nodes is during graceful shutdown. It is an intrinsic requirement of the API that this method operates like a barrier. Currently that is implemented using process groups and thus suffers from their problems, but even if we reimplemented it on top of TensorPipe it wouldn’t be much different: a failed link/node would cause all others to fail too. I don’t see a way around it while still upholding that requirement. However, if you have your own logic on the nodes’ lifetimes, you probably can do an ungraceful shutdown and thus avoid this whole problem entirely: in that case the TensorPipe backend never uses its process group so it doesn’t matter if it fails.

As for your point #3, to me it looks like we’re raising an exception in a function marked as noexcept. If that’s the case it should be an easy fix, but it would greatly help if you could provide a C++ stack trace. Just attach GDB to the process, set up exception catching (catch throw) and then let the process resume as usual (continue). When GDB then catches an exception you can get the stack trace through backtrace. There may be harmless exceptions that are fired before that last fatal one, so please make sure you get the right one. It would also help if your build has debug symbols, I don’t know if the default PyTorch one does.

Point #4 is particularly tricky, and also relates to the ones above. Dealing with exceptions is hard because, in principle, the RPC module is available to both C++ and Python, which have different exception types and thus it’s hard to preserve and convert between them. Moreover, the future objects have an error state which only contains a string message (see here), so there is no way to store a “error type” in there. The way the RPC module currently propagates the type from the remote UDF to the client is a bit hacky: it involves marking the future as successful, with a result value that contains the exception type and message, and then use an “unwrap” function that fires when accessing that value that raises it instead of returning it (see here). So it means that an errored UDF will return a successful future, but from Python there’s no way to distinguish them. A real failed future (which you get from example in case of an I/O error) will instead always raise a RuntimeError just because it has no way of specifying anything else.

All these problems are in principle solvable (storing a Python exception in C++ as pybind11::error_already_set, converting a C++ exception to Python using pybind11), but this requires changes to the future class which is a top-level util of PyTorch used also by other modules (like JIT) and that makes it hard to change it.

1 Like

I think dropping all processes if one connection has failed is pretty costly, but acceptable under most conditions.
I will try to collect the stack trace for point #3 once I got time.