RPC agent timeout error when not doing anything

I am using torch.distributed.rpc library and the RPC agent crashes if I don’t do any RPC call for 30 minutes.
Is this an expected behavior of the RPC library? I think it crashes because it thinks something went wrong when there is no RPC traffic for 30 minutes; however, it times out and crashes even when there was no RPC traffic because my code was written that way!

Below is a code snippet which can reproduce the problematic behavior. If waiting for 30 minutes is too long to test, you can change the hardcoded value in /rpc/backend_registry.py file (detailed in the comment).

import torch.distributed.rpc as rpc
from torch.multiprocessing import Process

import os
import time

def do_nothing():
    pass

def test(rank, size):
    rpc.init_rpc("Rank"+str(rank), rank=rank, world_size=size)
    print("Rank %s rpc init" % rank)

    i = 0
    # To test easily, I changed <PATH_TO_TORCH_LIB>/torch/distributed/rpc/backend_registry.py.
    # Under def _process_group_init_backend_handler(),
    # I changed the below line
    # >> process_group_timeout = rpc_constants.DEFAULT_PROCESS_GROUP_TIMEOUT
    # (which makes the timeout 30 minutes), to somewhat shorter value, e.g.,
    # >> process_group_timeout = datetime.timedelta(seconds=10).
    # Otherwise, if I wait for 30 min the problem still occurs.

    ## Loop that does not do anything for a long time...
    while i < 10:
        time.sleep(1)
        print("Rank %s %s sec passed..." % (rank, i))

        ## Uncommenting the below two lines makes the crash go away!
        ## I.e., generating some RPC traffic.
        #target = rank ^ 0x1
        #rpc.rpc_sync("Rank"+str(target), do_nothing)
        i += 1

    rpc.shutdown()
    print("Rank %s rpc shutdown" % rank)
    pass

if __name__ == "__main__":
    os.environ['MASTER_ADDR'] = "localhost"
    os.environ['MASTER_PORT'] = "29502"

    processes = []
    for rank in [0,1]:
        p = Process(target=test, args=(rank, 2, ))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

The error message is:

[E process_group_agent.cpp:664] Encountered exception in ProcessGroupAgent::listenLoop(): [/pytorch/third_party/gloo/gloo/transport/tcp/unbound_buffer.cc:84] Timed out waiting 10000ms for recv operation to complete on worker 1. This means that the RPC agent is in an unhealthy state and unusable.

I wonder if this is a bug, an expected behavior, or if I am using the API in an incorrect way. If it is an expected behavior, is there any workaround?

I am mainly experiencing this behavior because my code has a process that does not do any RPC calls, but instead calls functions under pytorch.distributed, such as distributed.all_reduce().
I first tried not initializing rpc at all in those processes, but instead calling distributed.init_process_group().
However, this made rpc.init_rpc() calls in other processes to hang or crash; I am suspecting that the problem happens because rpc.init_rpc() calls distributed.init_process_group() internally and somehow they don’t play well when some processes call init_process_group() via init_rpc() and others call it directly…
If rpc timing out after 30 min is an expected behavior, maybe I need to find a way to make some processes to call rpc.init_rpc() and others to call distributed.init_process_group() without failure.

Thank you in advance.

Hey @kmaeng, there are actually RPC activity for graceful RPC shutdown. See the code below. It’s basically using an RPC to prevent the idle process from exiting too early.

To get around this, you can increase the default RPC timeout. Depending on the version you are using, you can either provide the timeout value in init_rpc (with v1.5), or directly call rpc._set_rpc_timeout (with v1.4).

Or if you know for sure when a process can safely exit, you can use shutdown(graceful=False) and do the termination detection in application code.

Per op (rpc_sync/rpc_async/remote) timeout are coming soon.

Thank you for your response. I don’t fully understand your answer.
Can you clarify some points?

  1. I don’t understand what you are trying to show with the code you linked. Are you saying I can use the _wait_all_workers() or a snippet of the code inside to work around the issue (I am already calling rpc.shutdown() at the end that internally calls this. It is just that the processes dies before reaching here)?
    My main issue is that the rpc process group agent is killed after 30 minutes of being idle. Are you suggesting I just let it die and do graceful=False so that other processes do not die while trying to shutdown rpc?

  2. I am using v1.5 and tried giving timeout value to init_rpc, but did not work. I followed the code and figured the 30 min timeout only changes when I change this line:
    https://github.com/pytorch/pytorch/blob/91f451a5e69d2969d730744e98e059d05e63a84d/torch/distributed/rpc/backend_registry.py#L114
    As you can see, that line and below takes the rpc_constants.DEFAULT_PROCESS_GROUP_TIMEOUT and calls dist.init_process_group(), without using the provided value by me. From my testing, that value was generating the error I am keep seeing (afterward, when calling ProcessGroupAgent, the timeout value I provided is getting passed, but not for the dist.init_process_group()).

  3. In summary, is it normal to see the error message I posted if I call init_rpc and don’t use it for 30 minutes?

Thank you for your help. I really appreciate it.

1 Like

I see, sorry that I misread the original question.

In summary, is it normal to see the error message I posted if I call init_rpc and don’t use it for 30 minutes?

This is indeed a bug, and I think it is due to the following code, where the ProcessGroup RPC agent’s recvAnysource timed out. We should have passed rpc timeout to process group or set it to infinity in the listen loop.

Thanks for flagging this, do you want to create an issue on github to track this? We will fix. Thanks!

Yes, exactly, we have a tracking issue related to this problem: Avoid modifying default process group in init_rpc · Issue #33583 · pytorch/pytorch · GitHub

I submitted an issue (https://github.com/pytorch/pytorch/issues/38531), thanks!

1 Like